Cleanup network / transport related settings (#25489)
This commit makes the use of the global network settings explicit instead of implicit within NetworkService. It cleans up several places where we fall back to the global settings while we should have used tcp or http ones. In addition this change also removes unnecessary settings classes
This commit is contained in:
parent
2975e7f511
commit
5a7c8bb04e
|
@ -27,7 +27,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.http.HttpTransportSettings;
|
import org.elasticsearch.http.HttpTransportSettings;
|
||||||
import org.elasticsearch.plugins.PluginInfo;
|
import org.elasticsearch.plugins.PluginInfo;
|
||||||
import org.elasticsearch.transport.TransportSettings;
|
import org.elasticsearch.transport.TcpTransport;
|
||||||
|
|
||||||
import java.io.FilePermission;
|
import java.io.FilePermission;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -324,8 +324,8 @@ final class Security {
|
||||||
final Permissions policy,
|
final Permissions policy,
|
||||||
final Settings settings) {
|
final Settings settings) {
|
||||||
// transport is way over-engineered
|
// transport is way over-engineered
|
||||||
final Map<String, Settings> profiles = new HashMap<>(TransportSettings.TRANSPORT_PROFILES_SETTING.get(settings).getAsGroups());
|
final Map<String, Settings> profiles = new HashMap<>(TcpTransport.TRANSPORT_PROFILES_SETTING.get(settings).getAsGroups());
|
||||||
profiles.putIfAbsent(TransportSettings.DEFAULT_PROFILE, Settings.EMPTY);
|
profiles.putIfAbsent(TcpTransport.DEFAULT_PROFILE, Settings.EMPTY);
|
||||||
|
|
||||||
// loop through all profiles and add permissions for each one, if it's valid; otherwise Netty transports are lenient and ignores it
|
// loop through all profiles and add permissions for each one, if it's valid; otherwise Netty transports are lenient and ignores it
|
||||||
for (final Map.Entry<String, Settings> entry : profiles.entrySet()) {
|
for (final Map.Entry<String, Settings> entry : profiles.entrySet()) {
|
||||||
|
@ -335,7 +335,7 @@ final class Security {
|
||||||
// a profile is only valid if it's the default profile, or if it has an actual name and specifies a port
|
// a profile is only valid if it's the default profile, or if it has an actual name and specifies a port
|
||||||
// TODO: can this leniency be removed?
|
// TODO: can this leniency be removed?
|
||||||
final boolean valid =
|
final boolean valid =
|
||||||
TransportSettings.DEFAULT_PROFILE.equals(name) ||
|
TcpTransport.DEFAULT_PROFILE.equals(name) ||
|
||||||
(name != null && name.length() > 0 && profileSettings.get("port") != null);
|
(name != null && name.length() > 0 && profileSettings.get("port") != null);
|
||||||
if (valid) {
|
if (valid) {
|
||||||
final String transportRange = profileSettings.get("port");
|
final String transportRange = profileSettings.get("port");
|
||||||
|
@ -355,7 +355,7 @@ final class Security {
|
||||||
* @param settings the {@link Settings} instance to read the transport settings from
|
* @param settings the {@link Settings} instance to read the transport settings from
|
||||||
*/
|
*/
|
||||||
private static void addSocketPermissionForTransport(final Permissions policy, final Settings settings) {
|
private static void addSocketPermissionForTransport(final Permissions policy, final Settings settings) {
|
||||||
final String transportRange = TransportSettings.PORT.get(settings);
|
final String transportRange = TcpTransport.PORT.get(settings);
|
||||||
addSocketPermissionForPortRange(policy, transportRange);
|
addSocketPermissionForPortRange(policy, transportRange);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -127,7 +127,7 @@ public abstract class TransportClient extends AbstractClient {
|
||||||
final List<Closeable> resourcesToClose = new ArrayList<>();
|
final List<Closeable> resourcesToClose = new ArrayList<>();
|
||||||
final ThreadPool threadPool = new ThreadPool(settings);
|
final ThreadPool threadPool = new ThreadPool(settings);
|
||||||
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
|
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
|
||||||
final NetworkService networkService = new NetworkService(settings, Collections.emptyList());
|
final NetworkService networkService = new NetworkService(Collections.emptyList());
|
||||||
try {
|
try {
|
||||||
final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());
|
final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());
|
||||||
final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
|
final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
|
||||||
|
|
|
@ -19,11 +19,8 @@
|
||||||
|
|
||||||
package org.elasticsearch.common.network;
|
package org.elasticsearch.common.network;
|
||||||
|
|
||||||
import org.elasticsearch.common.Strings;
|
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
|
||||||
import org.elasticsearch.common.settings.Setting;
|
import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.common.settings.Setting.Property;
|
import org.elasticsearch.common.settings.Setting.Property;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
|
||||||
|
@ -31,38 +28,37 @@ import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
public class NetworkService extends AbstractComponent {
|
public final class NetworkService {
|
||||||
|
|
||||||
/** By default, we bind to loopback interfaces */
|
/** By default, we bind to loopback interfaces */
|
||||||
public static final String DEFAULT_NETWORK_HOST = "_local_";
|
public static final String DEFAULT_NETWORK_HOST = "_local_";
|
||||||
|
|
||||||
public static final Setting<List<String>> GLOBAL_NETWORK_HOST_SETTING =
|
public static final Setting<List<String>> GLOBAL_NETWORK_HOST_SETTING =
|
||||||
Setting.listSetting("network.host", Arrays.asList(DEFAULT_NETWORK_HOST), Function.identity(), Property.NodeScope);
|
Setting.listSetting("network.host", Collections.emptyList(), Function.identity(), Property.NodeScope);
|
||||||
public static final Setting<List<String>> GLOBAL_NETWORK_BINDHOST_SETTING =
|
public static final Setting<List<String>> GLOBAL_NETWORK_BINDHOST_SETTING =
|
||||||
Setting.listSetting("network.bind_host", GLOBAL_NETWORK_HOST_SETTING, Function.identity(), Property.NodeScope);
|
Setting.listSetting("network.bind_host", GLOBAL_NETWORK_HOST_SETTING, Function.identity(), Property.NodeScope);
|
||||||
public static final Setting<List<String>> GLOBAL_NETWORK_PUBLISHHOST_SETTING =
|
public static final Setting<List<String>> GLOBAL_NETWORK_PUBLISHHOST_SETTING =
|
||||||
Setting.listSetting("network.publish_host", GLOBAL_NETWORK_HOST_SETTING, Function.identity(), Property.NodeScope);
|
Setting.listSetting("network.publish_host", GLOBAL_NETWORK_HOST_SETTING, Function.identity(), Property.NodeScope);
|
||||||
public static final Setting<Boolean> NETWORK_SERVER = Setting.boolSetting("network.server", true, Property.NodeScope);
|
public static final Setting<Boolean> NETWORK_SERVER = Setting.boolSetting("network.server", true, Property.NodeScope);
|
||||||
|
|
||||||
public static final class TcpSettings {
|
public static final Setting<Boolean> TCP_NO_DELAY =
|
||||||
public static final Setting<Boolean> TCP_NO_DELAY =
|
Setting.boolSetting("network.tcp.no_delay", true, Property.NodeScope);
|
||||||
Setting.boolSetting("network.tcp.no_delay", true, Property.NodeScope);
|
public static final Setting<Boolean> TCP_KEEP_ALIVE =
|
||||||
public static final Setting<Boolean> TCP_KEEP_ALIVE =
|
Setting.boolSetting("network.tcp.keep_alive", true, Property.NodeScope);
|
||||||
Setting.boolSetting("network.tcp.keep_alive", true, Property.NodeScope);
|
public static final Setting<Boolean> TCP_REUSE_ADDRESS =
|
||||||
public static final Setting<Boolean> TCP_REUSE_ADDRESS =
|
Setting.boolSetting("network.tcp.reuse_address", NetworkUtils.defaultReuseAddress(), Property.NodeScope);
|
||||||
Setting.boolSetting("network.tcp.reuse_address", NetworkUtils.defaultReuseAddress(), Property.NodeScope);
|
public static final Setting<ByteSizeValue> TCP_SEND_BUFFER_SIZE =
|
||||||
public static final Setting<ByteSizeValue> TCP_SEND_BUFFER_SIZE =
|
Setting.byteSizeSetting("network.tcp.send_buffer_size", new ByteSizeValue(-1), Property.NodeScope);
|
||||||
Setting.byteSizeSetting("network.tcp.send_buffer_size", new ByteSizeValue(-1), Property.NodeScope);
|
public static final Setting<ByteSizeValue> TCP_RECEIVE_BUFFER_SIZE =
|
||||||
public static final Setting<ByteSizeValue> TCP_RECEIVE_BUFFER_SIZE =
|
Setting.byteSizeSetting("network.tcp.receive_buffer_size", new ByteSizeValue(-1), Property.NodeScope);
|
||||||
Setting.byteSizeSetting("network.tcp.receive_buffer_size", new ByteSizeValue(-1), Property.NodeScope);
|
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT =
|
||||||
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT =
|
Setting.timeSetting("network.tcp.connect_timeout", new TimeValue(30, TimeUnit.SECONDS), Property.NodeScope);
|
||||||
Setting.timeSetting("network.tcp.connect_timeout", new TimeValue(30, TimeUnit.SECONDS), Property.NodeScope);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A custom name resolver can support custom lookup keys (my_net_key:ipv4) and also change
|
* A custom name resolver can support custom lookup keys (my_net_key:ipv4) and also change
|
||||||
|
@ -82,9 +78,8 @@ public class NetworkService extends AbstractComponent {
|
||||||
|
|
||||||
private final List<CustomNameResolver> customNameResolvers;
|
private final List<CustomNameResolver> customNameResolvers;
|
||||||
|
|
||||||
public NetworkService(Settings settings, List<CustomNameResolver> customNameResolvers) {
|
public NetworkService(List<CustomNameResolver> customNameResolvers) {
|
||||||
super(settings);
|
this.customNameResolvers = Objects.requireNonNull(customNameResolvers, "customNameResolvers must be non null");
|
||||||
this.customNameResolvers = customNameResolvers;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -92,29 +87,20 @@ public class NetworkService extends AbstractComponent {
|
||||||
* not contain duplicate addresses.
|
* not contain duplicate addresses.
|
||||||
*
|
*
|
||||||
* @param bindHosts list of hosts to bind to. this may contain special pseudo-hostnames
|
* @param bindHosts list of hosts to bind to. this may contain special pseudo-hostnames
|
||||||
* such as _local_ (see the documentation). if it is null, it will be populated
|
* such as _local_ (see the documentation). if it is null, it will fall back to _local_
|
||||||
* based on global default settings.
|
*
|
||||||
* @return unique set of internet addresses
|
* @return unique set of internet addresses
|
||||||
*/
|
*/
|
||||||
public InetAddress[] resolveBindHostAddresses(String bindHosts[]) throws IOException {
|
public InetAddress[] resolveBindHostAddresses(String bindHosts[]) throws IOException {
|
||||||
// first check settings
|
|
||||||
if (bindHosts == null || bindHosts.length == 0) {
|
if (bindHosts == null || bindHosts.length == 0) {
|
||||||
if (GLOBAL_NETWORK_BINDHOST_SETTING.exists(settings) || GLOBAL_NETWORK_HOST_SETTING.exists(settings)) {
|
for (CustomNameResolver customNameResolver : customNameResolvers) {
|
||||||
// if we have settings use them (we have a fallback to GLOBAL_NETWORK_HOST_SETTING inline
|
InetAddress addresses[] = customNameResolver.resolveDefault();
|
||||||
bindHosts = GLOBAL_NETWORK_BINDHOST_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY);
|
if (addresses != null) {
|
||||||
} else {
|
return addresses;
|
||||||
// next check any registered custom resolvers if any
|
|
||||||
if (customNameResolvers != null) {
|
|
||||||
for (CustomNameResolver customNameResolver : customNameResolvers) {
|
|
||||||
InetAddress addresses[] = customNameResolver.resolveDefault();
|
|
||||||
if (addresses != null) {
|
|
||||||
return addresses;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// we know it's not here. get the defaults
|
|
||||||
bindHosts = GLOBAL_NETWORK_BINDHOST_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY);
|
|
||||||
}
|
}
|
||||||
|
// we know it's not here. get the defaults
|
||||||
|
bindHosts = new String[] {"_local_"};
|
||||||
}
|
}
|
||||||
|
|
||||||
InetAddress addresses[] = resolveInetAddresses(bindHosts);
|
InetAddress addresses[] = resolveInetAddresses(bindHosts);
|
||||||
|
@ -140,29 +126,20 @@ public class NetworkService extends AbstractComponent {
|
||||||
* If {@code publishHosts} resolves to more than one address, <b>then one is selected with magic</b>
|
* If {@code publishHosts} resolves to more than one address, <b>then one is selected with magic</b>
|
||||||
*
|
*
|
||||||
* @param publishHosts list of hosts to publish as. this may contain special pseudo-hostnames
|
* @param publishHosts list of hosts to publish as. this may contain special pseudo-hostnames
|
||||||
* such as _local_ (see the documentation). if it is null, it will be populated
|
* such as _local_ (see the documentation). if it is null, it will fall back to _local_
|
||||||
* based on global default settings.
|
|
||||||
* @return single internet address
|
* @return single internet address
|
||||||
*/
|
*/
|
||||||
// TODO: needs to be InetAddress[]
|
// TODO: needs to be InetAddress[]
|
||||||
public InetAddress resolvePublishHostAddresses(String publishHosts[]) throws IOException {
|
public InetAddress resolvePublishHostAddresses(String publishHosts[]) throws IOException {
|
||||||
if (publishHosts == null || publishHosts.length == 0) {
|
if (publishHosts == null || publishHosts.length == 0) {
|
||||||
if (GLOBAL_NETWORK_PUBLISHHOST_SETTING.exists(settings) || GLOBAL_NETWORK_HOST_SETTING.exists(settings)) {
|
for (CustomNameResolver customNameResolver : customNameResolvers) {
|
||||||
// if we have settings use them (we have a fallback to GLOBAL_NETWORK_HOST_SETTING inline
|
InetAddress addresses[] = customNameResolver.resolveDefault();
|
||||||
publishHosts = GLOBAL_NETWORK_PUBLISHHOST_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY);
|
if (addresses != null) {
|
||||||
} else {
|
return addresses[0];
|
||||||
// next check any registered custom resolvers if any
|
|
||||||
if (customNameResolvers != null) {
|
|
||||||
for (CustomNameResolver customNameResolver : customNameResolvers) {
|
|
||||||
InetAddress addresses[] = customNameResolver.resolveDefault();
|
|
||||||
if (addresses != null) {
|
|
||||||
return addresses[0];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// we know it's not here. get the defaults
|
|
||||||
publishHosts = GLOBAL_NETWORK_PUBLISHHOST_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY);
|
|
||||||
}
|
}
|
||||||
|
// we know it's not here. get the defaults
|
||||||
|
publishHosts = new String[] {DEFAULT_NETWORK_HOST};
|
||||||
}
|
}
|
||||||
|
|
||||||
InetAddress addresses[] = resolveInetAddresses(publishHosts);
|
InetAddress addresses[] = resolveInetAddresses(publishHosts);
|
||||||
|
|
|
@ -91,7 +91,6 @@ import org.elasticsearch.transport.RemoteClusterService;
|
||||||
import org.elasticsearch.transport.TcpTransport;
|
import org.elasticsearch.transport.TcpTransport;
|
||||||
import org.elasticsearch.transport.Transport;
|
import org.elasticsearch.transport.Transport;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
import org.elasticsearch.transport.TransportSettings;
|
|
||||||
import org.elasticsearch.tribe.TribeService;
|
import org.elasticsearch.tribe.TribeService;
|
||||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||||
|
|
||||||
|
@ -270,12 +269,12 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
||||||
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING,
|
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING,
|
||||||
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING,
|
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING,
|
||||||
Transport.TRANSPORT_TCP_COMPRESS,
|
Transport.TRANSPORT_TCP_COMPRESS,
|
||||||
TransportSettings.TRANSPORT_PROFILES_SETTING,
|
TcpTransport.TRANSPORT_PROFILES_SETTING,
|
||||||
TransportSettings.HOST,
|
TcpTransport.HOST,
|
||||||
TransportSettings.PUBLISH_HOST,
|
TcpTransport.PUBLISH_HOST,
|
||||||
TransportSettings.BIND_HOST,
|
TcpTransport.BIND_HOST,
|
||||||
TransportSettings.PUBLISH_PORT,
|
TcpTransport.PUBLISH_PORT,
|
||||||
TransportSettings.PORT,
|
TcpTransport.PORT,
|
||||||
TcpTransport.CONNECTIONS_PER_NODE_RECOVERY,
|
TcpTransport.CONNECTIONS_PER_NODE_RECOVERY,
|
||||||
TcpTransport.CONNECTIONS_PER_NODE_BULK,
|
TcpTransport.CONNECTIONS_PER_NODE_BULK,
|
||||||
TcpTransport.CONNECTIONS_PER_NODE_REG,
|
TcpTransport.CONNECTIONS_PER_NODE_REG,
|
||||||
|
@ -292,12 +291,12 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
||||||
NetworkService.GLOBAL_NETWORK_HOST_SETTING,
|
NetworkService.GLOBAL_NETWORK_HOST_SETTING,
|
||||||
NetworkService.GLOBAL_NETWORK_BINDHOST_SETTING,
|
NetworkService.GLOBAL_NETWORK_BINDHOST_SETTING,
|
||||||
NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING,
|
NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING,
|
||||||
NetworkService.TcpSettings.TCP_NO_DELAY,
|
NetworkService.TCP_NO_DELAY,
|
||||||
NetworkService.TcpSettings.TCP_KEEP_ALIVE,
|
NetworkService.TCP_KEEP_ALIVE,
|
||||||
NetworkService.TcpSettings.TCP_REUSE_ADDRESS,
|
NetworkService.TCP_REUSE_ADDRESS,
|
||||||
NetworkService.TcpSettings.TCP_SEND_BUFFER_SIZE,
|
NetworkService.TCP_SEND_BUFFER_SIZE,
|
||||||
NetworkService.TcpSettings.TCP_RECEIVE_BUFFER_SIZE,
|
NetworkService.TCP_RECEIVE_BUFFER_SIZE,
|
||||||
NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT,
|
NetworkService.TCP_CONNECT_TIMEOUT,
|
||||||
IndexSettings.QUERY_STRING_ANALYZE_WILDCARD,
|
IndexSettings.QUERY_STRING_ANALYZE_WILDCARD,
|
||||||
IndexSettings.QUERY_STRING_ALLOW_LEADING_WILDCARD,
|
IndexSettings.QUERY_STRING_ALLOW_LEADING_WILDCARD,
|
||||||
ScriptService.SCRIPT_CACHE_SIZE_SETTING,
|
ScriptService.SCRIPT_CACHE_SIZE_SETTING,
|
||||||
|
|
|
@ -52,7 +52,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.routing.RoutingService;
|
import org.elasticsearch.cluster.routing.RoutingService;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.StopWatch;
|
import org.elasticsearch.common.StopWatch;
|
||||||
import org.elasticsearch.common.SuppressForbidden;
|
|
||||||
import org.elasticsearch.common.component.Lifecycle;
|
import org.elasticsearch.common.component.Lifecycle;
|
||||||
import org.elasticsearch.common.component.LifecycleComponent;
|
import org.elasticsearch.common.component.LifecycleComponent;
|
||||||
import org.elasticsearch.common.inject.Binder;
|
import org.elasticsearch.common.inject.Binder;
|
||||||
|
@ -61,7 +60,6 @@ import org.elasticsearch.common.inject.Key;
|
||||||
import org.elasticsearch.common.inject.Module;
|
import org.elasticsearch.common.inject.Module;
|
||||||
import org.elasticsearch.common.inject.ModulesBuilder;
|
import org.elasticsearch.common.inject.ModulesBuilder;
|
||||||
import org.elasticsearch.common.inject.util.Providers;
|
import org.elasticsearch.common.inject.util.Providers;
|
||||||
import org.elasticsearch.common.io.PathUtils;
|
|
||||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||||
import org.elasticsearch.common.lease.Releasables;
|
import org.elasticsearch.common.lease.Releasables;
|
||||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||||
|
@ -151,9 +149,7 @@ import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
@ -330,7 +326,7 @@ public class Node implements Closeable {
|
||||||
final SettingsModule settingsModule = new SettingsModule(this.settings, additionalSettings, additionalSettingsFilter);
|
final SettingsModule settingsModule = new SettingsModule(this.settings, additionalSettings, additionalSettingsFilter);
|
||||||
scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings());
|
scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings());
|
||||||
resourcesToClose.add(resourceWatcherService);
|
resourcesToClose.add(resourceWatcherService);
|
||||||
final NetworkService networkService = new NetworkService(settings,
|
final NetworkService networkService = new NetworkService(
|
||||||
getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));
|
getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));
|
||||||
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
|
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
|
||||||
clusterService.addListener(scriptModule.getScriptService());
|
clusterService.addListener(scriptModule.getScriptService());
|
||||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.apache.logging.log4j.util.Supplier;
|
import org.apache.logging.log4j.util.Supplier;
|
||||||
import org.apache.lucene.util.IOUtils;
|
import org.apache.lucene.util.IOUtils;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.NotifyOnceListener;
|
import org.elasticsearch.action.NotifyOnceListener;
|
||||||
|
@ -67,7 +66,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||||
import org.elasticsearch.common.util.concurrent.KeyedLock;
|
import org.elasticsearch.common.util.concurrent.KeyedLock;
|
||||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
import org.elasticsearch.common.util.iterable.Iterables;
|
|
||||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
|
@ -103,13 +101,17 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
import java.util.function.Function;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static java.util.Collections.emptyList;
|
||||||
import static java.util.Collections.unmodifiableMap;
|
import static java.util.Collections.unmodifiableMap;
|
||||||
import static org.elasticsearch.common.settings.Setting.boolSetting;
|
import static org.elasticsearch.common.settings.Setting.boolSetting;
|
||||||
|
import static org.elasticsearch.common.settings.Setting.groupSetting;
|
||||||
import static org.elasticsearch.common.settings.Setting.intSetting;
|
import static org.elasticsearch.common.settings.Setting.intSetting;
|
||||||
|
import static org.elasticsearch.common.settings.Setting.listSetting;
|
||||||
import static org.elasticsearch.common.settings.Setting.timeSetting;
|
import static org.elasticsearch.common.settings.Setting.timeSetting;
|
||||||
import static org.elasticsearch.common.transport.NetworkExceptionHelper.isCloseConnectionException;
|
import static org.elasticsearch.common.transport.NetworkExceptionHelper.isCloseConnectionException;
|
||||||
import static org.elasticsearch.common.transport.NetworkExceptionHelper.isConnectException;
|
import static org.elasticsearch.common.transport.NetworkExceptionHelper.isConnectException;
|
||||||
|
@ -120,6 +122,19 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
||||||
public static final String TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX = "transport_server_worker";
|
public static final String TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX = "transport_server_worker";
|
||||||
public static final String TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX = "transport_client_boss";
|
public static final String TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX = "transport_client_boss";
|
||||||
|
|
||||||
|
public static final Setting<List<String>> HOST =
|
||||||
|
listSetting("transport.host", emptyList(), Function.identity(), Setting.Property.NodeScope);
|
||||||
|
public static final Setting<List<String>> BIND_HOST =
|
||||||
|
listSetting("transport.bind_host", HOST, Function.identity(), Setting.Property.NodeScope);
|
||||||
|
public static final Setting<List<String>> PUBLISH_HOST =
|
||||||
|
listSetting("transport.publish_host", HOST, Function.identity(), Setting.Property.NodeScope);
|
||||||
|
public static final Setting<String> PORT =
|
||||||
|
new Setting<>("transport.tcp.port", "9300-9400", Function.identity(), Setting.Property.NodeScope);
|
||||||
|
public static final Setting<Integer> PUBLISH_PORT =
|
||||||
|
intSetting("transport.publish_port", -1, -1, Setting.Property.NodeScope);
|
||||||
|
public static final String DEFAULT_PROFILE = "default";
|
||||||
|
public static final Setting<Settings> TRANSPORT_PROFILES_SETTING =
|
||||||
|
groupSetting("transport.profiles.", Setting.Property.Dynamic, Setting.Property.NodeScope);
|
||||||
// the scheduled internal ping interval setting, defaults to disabled (-1)
|
// the scheduled internal ping interval setting, defaults to disabled (-1)
|
||||||
public static final Setting<TimeValue> PING_SCHEDULE =
|
public static final Setting<TimeValue> PING_SCHEDULE =
|
||||||
timeSetting("transport.ping_schedule", TimeValue.timeValueSeconds(-1), Setting.Property.NodeScope);
|
timeSetting("transport.ping_schedule", TimeValue.timeValueSeconds(-1), Setting.Property.NodeScope);
|
||||||
|
@ -134,20 +149,21 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
||||||
public static final Setting<Integer> CONNECTIONS_PER_NODE_PING =
|
public static final Setting<Integer> CONNECTIONS_PER_NODE_PING =
|
||||||
intSetting("transport.connections_per_node.ping", 1, 1, Setting.Property.NodeScope);
|
intSetting("transport.connections_per_node.ping", 1, 1, Setting.Property.NodeScope);
|
||||||
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT =
|
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT =
|
||||||
timeSetting("transport.tcp.connect_timeout", NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT, Setting.Property.NodeScope);
|
timeSetting("transport.tcp.connect_timeout", NetworkService.TCP_CONNECT_TIMEOUT, Setting.Property.NodeScope);
|
||||||
public static final Setting<Boolean> TCP_NO_DELAY =
|
public static final Setting<Boolean> TCP_NO_DELAY =
|
||||||
boolSetting("transport.tcp_no_delay", NetworkService.TcpSettings.TCP_NO_DELAY, Setting.Property.NodeScope);
|
boolSetting("transport.tcp_no_delay", NetworkService.TCP_NO_DELAY, Setting.Property.NodeScope);
|
||||||
public static final Setting<Boolean> TCP_KEEP_ALIVE =
|
public static final Setting<Boolean> TCP_KEEP_ALIVE =
|
||||||
boolSetting("transport.tcp.keep_alive", NetworkService.TcpSettings.TCP_KEEP_ALIVE, Setting.Property.NodeScope);
|
boolSetting("transport.tcp.keep_alive", NetworkService.TCP_KEEP_ALIVE, Setting.Property.NodeScope);
|
||||||
public static final Setting<Boolean> TCP_REUSE_ADDRESS =
|
public static final Setting<Boolean> TCP_REUSE_ADDRESS =
|
||||||
boolSetting("transport.tcp.reuse_address", NetworkService.TcpSettings.TCP_REUSE_ADDRESS, Setting.Property.NodeScope);
|
boolSetting("transport.tcp.reuse_address", NetworkService.TCP_REUSE_ADDRESS, Setting.Property.NodeScope);
|
||||||
public static final Setting<ByteSizeValue> TCP_SEND_BUFFER_SIZE =
|
public static final Setting<ByteSizeValue> TCP_SEND_BUFFER_SIZE =
|
||||||
Setting.byteSizeSetting("transport.tcp.send_buffer_size", NetworkService.TcpSettings.TCP_SEND_BUFFER_SIZE,
|
Setting.byteSizeSetting("transport.tcp.send_buffer_size", NetworkService.TCP_SEND_BUFFER_SIZE,
|
||||||
Setting.Property.NodeScope);
|
Setting.Property.NodeScope);
|
||||||
public static final Setting<ByteSizeValue> TCP_RECEIVE_BUFFER_SIZE =
|
public static final Setting<ByteSizeValue> TCP_RECEIVE_BUFFER_SIZE =
|
||||||
Setting.byteSizeSetting("transport.tcp.receive_buffer_size", NetworkService.TcpSettings.TCP_RECEIVE_BUFFER_SIZE,
|
Setting.byteSizeSetting("transport.tcp.receive_buffer_size", NetworkService.TCP_RECEIVE_BUFFER_SIZE,
|
||||||
Setting.Property.NodeScope);
|
Setting.Property.NodeScope);
|
||||||
|
|
||||||
|
|
||||||
private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9);
|
private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9);
|
||||||
private static final int PING_DATA_SIZE = -1;
|
private static final int PING_DATA_SIZE = -1;
|
||||||
private final CircuitBreakerService circuitBreakerService;
|
private final CircuitBreakerService circuitBreakerService;
|
||||||
|
@ -650,12 +666,12 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
||||||
|
|
||||||
protected Map<String, Settings> buildProfileSettings() {
|
protected Map<String, Settings> buildProfileSettings() {
|
||||||
// extract default profile first and create standard bootstrap
|
// extract default profile first and create standard bootstrap
|
||||||
Map<String, Settings> profiles = TransportSettings.TRANSPORT_PROFILES_SETTING.get(settings).getAsGroups(true);
|
Map<String, Settings> profiles = TRANSPORT_PROFILES_SETTING.get(settings).getAsGroups(true);
|
||||||
if (!profiles.containsKey(TransportSettings.DEFAULT_PROFILE)) {
|
if (!profiles.containsKey(DEFAULT_PROFILE)) {
|
||||||
profiles = new HashMap<>(profiles);
|
profiles = new HashMap<>(profiles);
|
||||||
profiles.put(TransportSettings.DEFAULT_PROFILE, Settings.EMPTY);
|
profiles.put(DEFAULT_PROFILE, Settings.EMPTY);
|
||||||
}
|
}
|
||||||
Settings defaultSettings = profiles.get(TransportSettings.DEFAULT_PROFILE);
|
Settings defaultSettings = profiles.get(DEFAULT_PROFILE);
|
||||||
Map<String, Settings> result = new HashMap<>();
|
Map<String, Settings> result = new HashMap<>();
|
||||||
// loop through all profiles and start them up, special handling for default one
|
// loop through all profiles and start them up, special handling for default one
|
||||||
for (Map.Entry<String, Settings> entry : profiles.entrySet()) {
|
for (Map.Entry<String, Settings> entry : profiles.entrySet()) {
|
||||||
|
@ -666,10 +682,10 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
||||||
logger.info("transport profile configured without a name. skipping profile with settings [{}]",
|
logger.info("transport profile configured without a name. skipping profile with settings [{}]",
|
||||||
profileSettings.toDelimitedString(','));
|
profileSettings.toDelimitedString(','));
|
||||||
continue;
|
continue;
|
||||||
} else if (TransportSettings.DEFAULT_PROFILE.equals(name)) {
|
} else if (DEFAULT_PROFILE.equals(name)) {
|
||||||
profileSettings = Settings.builder()
|
profileSettings = Settings.builder()
|
||||||
.put(profileSettings)
|
.put(profileSettings)
|
||||||
.put("port", profileSettings.get("port", TransportSettings.PORT.get(this.settings)))
|
.put("port", profileSettings.get("port", PORT.get(this.settings)))
|
||||||
.build();
|
.build();
|
||||||
} else if (profileSettings.get("port") == null) {
|
} else if (profileSettings.get("port") == null) {
|
||||||
// if profile does not have a port, skip it
|
// if profile does not have a port, skip it
|
||||||
|
@ -696,10 +712,11 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
||||||
return local;
|
return local;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void bindServer(final String name, final Settings settings) {
|
protected void bindServer(final String name, final Settings profileSettings) {
|
||||||
// Bind and start to accept incoming connections.
|
// Bind and start to accept incoming connections.
|
||||||
InetAddress hostAddresses[];
|
InetAddress hostAddresses[];
|
||||||
String bindHosts[] = settings.getAsArray("bind_host", null);
|
String bindHosts[] = profileSettings.getAsArray("bind_host",
|
||||||
|
NetworkService.GLOBAL_NETWORK_BINDHOST_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY));
|
||||||
try {
|
try {
|
||||||
hostAddresses = networkService.resolveBindHostAddresses(bindHosts);
|
hostAddresses = networkService.resolveBindHostAddresses(bindHosts);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -717,12 +734,12 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
||||||
|
|
||||||
List<InetSocketAddress> boundAddresses = new ArrayList<>();
|
List<InetSocketAddress> boundAddresses = new ArrayList<>();
|
||||||
for (InetAddress hostAddress : hostAddresses) {
|
for (InetAddress hostAddress : hostAddresses) {
|
||||||
boundAddresses.add(bindToPort(name, hostAddress, settings.get("port")));
|
boundAddresses.add(bindToPort(name, hostAddress, profileSettings.get("port")));
|
||||||
}
|
}
|
||||||
|
|
||||||
final BoundTransportAddress boundTransportAddress = createBoundTransportAddress(name, settings, boundAddresses);
|
final BoundTransportAddress boundTransportAddress = createBoundTransportAddress(name, profileSettings, boundAddresses);
|
||||||
|
|
||||||
if (TransportSettings.DEFAULT_PROFILE.equals(name)) {
|
if (DEFAULT_PROFILE.equals(name)) {
|
||||||
this.boundAddress = boundTransportAddress;
|
this.boundAddress = boundTransportAddress;
|
||||||
} else {
|
} else {
|
||||||
profileBoundAddresses.put(name, boundTransportAddress);
|
profileBoundAddresses.put(name, boundTransportAddress);
|
||||||
|
@ -772,12 +789,15 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
||||||
transportBoundAddresses[i] = new TransportAddress(boundAddress);
|
transportBoundAddresses[i] = new TransportAddress(boundAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
final String[] publishHosts;
|
String[] publishHosts;
|
||||||
if (TransportSettings.DEFAULT_PROFILE.equals(name)) {
|
if (DEFAULT_PROFILE.equals(name)) {
|
||||||
publishHosts = TransportSettings.PUBLISH_HOST.get(settings).toArray(Strings.EMPTY_ARRAY);
|
publishHosts = PUBLISH_HOST.get(settings).toArray(Strings.EMPTY_ARRAY);
|
||||||
} else {
|
} else {
|
||||||
publishHosts = profileSettings.getAsArray("publish_host", boundAddressesHostStrings);
|
publishHosts = profileSettings.getAsArray("publish_host", boundAddressesHostStrings);
|
||||||
}
|
}
|
||||||
|
if (publishHosts == null || publishHosts.length == 0) {
|
||||||
|
publishHosts = NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY);
|
||||||
|
}
|
||||||
|
|
||||||
final InetAddress publishInetAddress;
|
final InetAddress publishInetAddress;
|
||||||
try {
|
try {
|
||||||
|
@ -795,8 +815,8 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
||||||
public static int resolvePublishPort(String profileName, Settings settings, Settings profileSettings,
|
public static int resolvePublishPort(String profileName, Settings settings, Settings profileSettings,
|
||||||
List<InetSocketAddress> boundAddresses, InetAddress publishInetAddress) {
|
List<InetSocketAddress> boundAddresses, InetAddress publishInetAddress) {
|
||||||
int publishPort;
|
int publishPort;
|
||||||
if (TransportSettings.DEFAULT_PROFILE.equals(profileName)) {
|
if (DEFAULT_PROFILE.equals(profileName)) {
|
||||||
publishPort = TransportSettings.PUBLISH_PORT.get(settings);
|
publishPort = PUBLISH_PORT.get(settings);
|
||||||
} else {
|
} else {
|
||||||
publishPort = profileSettings.getAsInt("publish_port", -1);
|
publishPort = profileSettings.getAsInt("publish_port", -1);
|
||||||
}
|
}
|
||||||
|
@ -824,18 +844,18 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
||||||
}
|
}
|
||||||
|
|
||||||
if (publishPort < 0) {
|
if (publishPort < 0) {
|
||||||
String profileExplanation = TransportSettings.DEFAULT_PROFILE.equals(profileName) ? "" : " for profile " + profileName;
|
String profileExplanation = DEFAULT_PROFILE.equals(profileName) ? "" : " for profile " + profileName;
|
||||||
throw new BindTransportException("Failed to auto-resolve publish port" + profileExplanation + ", multiple bound addresses " +
|
throw new BindTransportException("Failed to auto-resolve publish port" + profileExplanation + ", multiple bound addresses " +
|
||||||
boundAddresses + " with distinct ports and none of them matched the publish address (" + publishInetAddress + "). " +
|
boundAddresses + " with distinct ports and none of them matched the publish address (" + publishInetAddress + "). " +
|
||||||
"Please specify a unique port by setting " + TransportSettings.PORT.getKey() + " or " +
|
"Please specify a unique port by setting " + PORT.getKey() + " or " +
|
||||||
TransportSettings.PUBLISH_PORT.getKey());
|
PUBLISH_PORT.getKey());
|
||||||
}
|
}
|
||||||
return publishPort;
|
return publishPort;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
|
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
|
||||||
return parse(address, settings.get("transport.profiles.default.port", TransportSettings.PORT.get(settings)), perAddressLimit);
|
return parse(address, settings.get("transport.profiles.default.port", PORT.get(settings)), perAddressLimit);
|
||||||
}
|
}
|
||||||
|
|
||||||
// this code is a take on guava's HostAndPort, like a HostAndPortRange
|
// this code is a take on guava's HostAndPort, like a HostAndPortRange
|
||||||
|
|
|
@ -1,56 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to Elasticsearch under one or more contributor
|
|
||||||
* license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright
|
|
||||||
* ownership. Elasticsearch licenses this file to you under
|
|
||||||
* the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
* not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.elasticsearch.transport;
|
|
||||||
|
|
||||||
import org.elasticsearch.common.settings.Setting;
|
|
||||||
import org.elasticsearch.common.settings.Setting.Property;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.function.Function;
|
|
||||||
|
|
||||||
import static java.util.Collections.emptyList;
|
|
||||||
import static org.elasticsearch.common.settings.Setting.groupSetting;
|
|
||||||
import static org.elasticsearch.common.settings.Setting.intSetting;
|
|
||||||
import static org.elasticsearch.common.settings.Setting.listSetting;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* a collection of settings related to transport components, which are also needed in org.elasticsearch.bootstrap.Security
|
|
||||||
* This class should only contain static code which is *safe* to load before the security manager is enforced.
|
|
||||||
*/
|
|
||||||
public final class TransportSettings {
|
|
||||||
|
|
||||||
public static final Setting<List<String>> HOST =
|
|
||||||
listSetting("transport.host", emptyList(), Function.identity(), Property.NodeScope);
|
|
||||||
public static final Setting<List<String>> PUBLISH_HOST =
|
|
||||||
listSetting("transport.publish_host", HOST, Function.identity(), Property.NodeScope);
|
|
||||||
public static final Setting<List<String>> BIND_HOST =
|
|
||||||
listSetting("transport.bind_host", HOST, Function.identity(), Property.NodeScope);
|
|
||||||
public static final Setting<String> PORT =
|
|
||||||
new Setting<>("transport.tcp.port", "9300-9400", Function.identity(), Property.NodeScope);
|
|
||||||
public static final Setting<Integer> PUBLISH_PORT =
|
|
||||||
intSetting("transport.publish_port", -1, -1, Property.NodeScope);
|
|
||||||
public static final String DEFAULT_PROFILE = "default";
|
|
||||||
public static final Setting<Settings> TRANSPORT_PROFILES_SETTING =
|
|
||||||
groupSetting("transport.profiles.", Property.Dynamic, Property.NodeScope);
|
|
||||||
|
|
||||||
private TransportSettings() {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -65,7 +65,7 @@ import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.node.Node;
|
import org.elasticsearch.node.Node;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.transport.TransportSettings;
|
import org.elasticsearch.transport.TcpTransport;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
@ -207,9 +207,9 @@ public class TribeService extends AbstractLifecycleComponent {
|
||||||
NetworkService.GLOBAL_NETWORK_HOST_SETTING,
|
NetworkService.GLOBAL_NETWORK_HOST_SETTING,
|
||||||
NetworkService.GLOBAL_NETWORK_BINDHOST_SETTING,
|
NetworkService.GLOBAL_NETWORK_BINDHOST_SETTING,
|
||||||
NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING,
|
NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING,
|
||||||
TransportSettings.HOST,
|
TcpTransport.HOST,
|
||||||
TransportSettings.BIND_HOST,
|
TcpTransport.BIND_HOST,
|
||||||
TransportSettings.PUBLISH_HOST
|
TcpTransport.PUBLISH_HOST
|
||||||
);
|
);
|
||||||
private final String onConflict;
|
private final String onConflict;
|
||||||
private final Set<String> droppedIndices = ConcurrentCollections.newConcurrentSet();
|
private final Set<String> droppedIndices = ConcurrentCollections.newConcurrentSet();
|
||||||
|
|
|
@ -176,7 +176,7 @@ public abstract class TaskManagerTestCase extends ESTestCase {
|
||||||
transportService = new TransportService(settings,
|
transportService = new TransportService(settings,
|
||||||
new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(),
|
new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(),
|
||||||
new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
|
new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
|
||||||
new NetworkService(settings, Collections.emptyList())),
|
new NetworkService(Collections.emptyList())),
|
||||||
threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddressDiscoveryNodeFunction, null) {
|
threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddressDiscoveryNodeFunction, null) {
|
||||||
@Override
|
@Override
|
||||||
protected TaskManager createTaskManager() {
|
protected TaskManager createTaskManager() {
|
||||||
|
|
|
@ -93,7 +93,7 @@ public class BroadcastReplicationTests extends ESTestCase {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY,
|
MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY,
|
||||||
threadPool, BigArrays.NON_RECYCLING_INSTANCE, circuitBreakerService, new NamedWriteableRegistry(Collections.emptyList()),
|
threadPool, BigArrays.NON_RECYCLING_INSTANCE, circuitBreakerService, new NamedWriteableRegistry(Collections.emptyList()),
|
||||||
new NetworkService(Settings.EMPTY, Collections.emptyList()));
|
new NetworkService(Collections.emptyList()));
|
||||||
clusterService = createClusterService(threadPool);
|
clusterService = createClusterService(threadPool);
|
||||||
transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
|
transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
|
||||||
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
|
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
|
||||||
|
|
|
@ -56,7 +56,6 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.lease.Releasable;
|
import org.elasticsearch.common.lease.Releasable;
|
||||||
import org.elasticsearch.common.network.NetworkService;
|
import org.elasticsearch.common.network.NetworkService;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.transport.TransportAddress;
|
|
||||||
import org.elasticsearch.common.util.BigArrays;
|
import org.elasticsearch.common.util.BigArrays;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.IndexNotFoundException;
|
import org.elasticsearch.index.IndexNotFoundException;
|
||||||
|
@ -91,13 +90,10 @@ import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.UnknownHostException;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -941,7 +937,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
||||||
final ReplicationTask task = maybeTask();
|
final ReplicationTask task = maybeTask();
|
||||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
||||||
final Transport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
final Transport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
||||||
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Settings.EMPTY, Collections.emptyList()),
|
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()),
|
||||||
Version.CURRENT);
|
Version.CURRENT);
|
||||||
transportService = new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
|
transportService = new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
|
||||||
x -> clusterService.localNode(),null);
|
x -> clusterService.localNode(),null);
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
|
|
||||||
package org.elasticsearch.common.network;
|
package org.elasticsearch.common.network;
|
||||||
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
|
@ -37,7 +36,7 @@ public class NetworkServiceTests extends ESTestCase {
|
||||||
* ensure exception if we bind to multicast ipv4 address
|
* ensure exception if we bind to multicast ipv4 address
|
||||||
*/
|
*/
|
||||||
public void testBindMulticastV4() throws Exception {
|
public void testBindMulticastV4() throws Exception {
|
||||||
NetworkService service = new NetworkService(Settings.EMPTY, Collections.emptyList());
|
NetworkService service = new NetworkService(Collections.emptyList());
|
||||||
try {
|
try {
|
||||||
service.resolveBindHostAddresses(new String[] { "239.1.1.1" });
|
service.resolveBindHostAddresses(new String[] { "239.1.1.1" });
|
||||||
fail("should have hit exception");
|
fail("should have hit exception");
|
||||||
|
@ -49,7 +48,7 @@ public class NetworkServiceTests extends ESTestCase {
|
||||||
* ensure exception if we bind to multicast ipv6 address
|
* ensure exception if we bind to multicast ipv6 address
|
||||||
*/
|
*/
|
||||||
public void testBindMulticastV6() throws Exception {
|
public void testBindMulticastV6() throws Exception {
|
||||||
NetworkService service = new NetworkService(Settings.EMPTY, Collections.emptyList());
|
NetworkService service = new NetworkService(Collections.emptyList());
|
||||||
try {
|
try {
|
||||||
service.resolveBindHostAddresses(new String[] { "FF08::108" });
|
service.resolveBindHostAddresses(new String[] { "FF08::108" });
|
||||||
fail("should have hit exception");
|
fail("should have hit exception");
|
||||||
|
@ -62,7 +61,7 @@ public class NetworkServiceTests extends ESTestCase {
|
||||||
* ensure exception if we publish to multicast ipv4 address
|
* ensure exception if we publish to multicast ipv4 address
|
||||||
*/
|
*/
|
||||||
public void testPublishMulticastV4() throws Exception {
|
public void testPublishMulticastV4() throws Exception {
|
||||||
NetworkService service = new NetworkService(Settings.EMPTY, Collections.emptyList());
|
NetworkService service = new NetworkService(Collections.emptyList());
|
||||||
try {
|
try {
|
||||||
service.resolvePublishHostAddresses(new String[] { "239.1.1.1" });
|
service.resolvePublishHostAddresses(new String[] { "239.1.1.1" });
|
||||||
fail("should have hit exception");
|
fail("should have hit exception");
|
||||||
|
@ -75,7 +74,7 @@ public class NetworkServiceTests extends ESTestCase {
|
||||||
* ensure exception if we publish to multicast ipv6 address
|
* ensure exception if we publish to multicast ipv6 address
|
||||||
*/
|
*/
|
||||||
public void testPublishMulticastV6() throws Exception {
|
public void testPublishMulticastV6() throws Exception {
|
||||||
NetworkService service = new NetworkService(Settings.EMPTY, Collections.emptyList());
|
NetworkService service = new NetworkService(Collections.emptyList());
|
||||||
try {
|
try {
|
||||||
service.resolvePublishHostAddresses(new String[] { "FF08::108" });
|
service.resolvePublishHostAddresses(new String[] { "FF08::108" });
|
||||||
fail("should have hit exception");
|
fail("should have hit exception");
|
||||||
|
@ -88,15 +87,16 @@ public class NetworkServiceTests extends ESTestCase {
|
||||||
* ensure specifying wildcard ipv4 address will bind to all interfaces
|
* ensure specifying wildcard ipv4 address will bind to all interfaces
|
||||||
*/
|
*/
|
||||||
public void testBindAnyLocalV4() throws Exception {
|
public void testBindAnyLocalV4() throws Exception {
|
||||||
NetworkService service = new NetworkService(Settings.EMPTY, Collections.emptyList());
|
NetworkService service = new NetworkService(Collections.emptyList());
|
||||||
assertEquals(InetAddress.getByName("0.0.0.0"), service.resolveBindHostAddresses(new String[] { "0.0.0.0" })[0]);
|
assertEquals(InetAddress.getByName("0.0.0.0"), service.resolveBindHostAddresses(new String[] { "0.0.0.0" }
|
||||||
|
)[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ensure specifying wildcard ipv6 address will bind to all interfaces
|
* ensure specifying wildcard ipv6 address will bind to all interfaces
|
||||||
*/
|
*/
|
||||||
public void testBindAnyLocalV6() throws Exception {
|
public void testBindAnyLocalV6() throws Exception {
|
||||||
NetworkService service = new NetworkService(Settings.EMPTY, Collections.emptyList());
|
NetworkService service = new NetworkService(Collections.emptyList());
|
||||||
assertEquals(InetAddress.getByName("::"), service.resolveBindHostAddresses(new String[] { "::" })[0]);
|
assertEquals(InetAddress.getByName("::"), service.resolveBindHostAddresses(new String[] { "::" })[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,7 +104,7 @@ public class NetworkServiceTests extends ESTestCase {
|
||||||
* ensure specifying wildcard ipv4 address selects reasonable publish address
|
* ensure specifying wildcard ipv4 address selects reasonable publish address
|
||||||
*/
|
*/
|
||||||
public void testPublishAnyLocalV4() throws Exception {
|
public void testPublishAnyLocalV4() throws Exception {
|
||||||
NetworkService service = new NetworkService(Settings.EMPTY, Collections.emptyList());
|
NetworkService service = new NetworkService(Collections.emptyList());
|
||||||
InetAddress address = service.resolvePublishHostAddresses(new String[] { "0.0.0.0" });
|
InetAddress address = service.resolvePublishHostAddresses(new String[] { "0.0.0.0" });
|
||||||
assertFalse(address.isAnyLocalAddress());
|
assertFalse(address.isAnyLocalAddress());
|
||||||
}
|
}
|
||||||
|
@ -113,7 +113,7 @@ public class NetworkServiceTests extends ESTestCase {
|
||||||
* ensure specifying wildcard ipv6 address selects reasonable publish address
|
* ensure specifying wildcard ipv6 address selects reasonable publish address
|
||||||
*/
|
*/
|
||||||
public void testPublishAnyLocalV6() throws Exception {
|
public void testPublishAnyLocalV6() throws Exception {
|
||||||
NetworkService service = new NetworkService(Settings.EMPTY, Collections.emptyList());
|
NetworkService service = new NetworkService(Collections.emptyList());
|
||||||
InetAddress address = service.resolvePublishHostAddresses(new String[] { "::" });
|
InetAddress address = service.resolvePublishHostAddresses(new String[] { "::" });
|
||||||
assertFalse(address.isAnyLocalAddress());
|
assertFalse(address.isAnyLocalAddress());
|
||||||
}
|
}
|
||||||
|
@ -122,7 +122,7 @@ public class NetworkServiceTests extends ESTestCase {
|
||||||
* ensure we can bind to multiple addresses
|
* ensure we can bind to multiple addresses
|
||||||
*/
|
*/
|
||||||
public void testBindMultipleAddresses() throws Exception {
|
public void testBindMultipleAddresses() throws Exception {
|
||||||
NetworkService service = new NetworkService(Settings.EMPTY, Collections.emptyList());
|
NetworkService service = new NetworkService(Collections.emptyList());
|
||||||
InetAddress[] addresses = service.resolveBindHostAddresses(new String[]{"127.0.0.1", "127.0.0.2"});
|
InetAddress[] addresses = service.resolveBindHostAddresses(new String[]{"127.0.0.1", "127.0.0.2"});
|
||||||
assertThat(addresses.length, is(2));
|
assertThat(addresses.length, is(2));
|
||||||
}
|
}
|
||||||
|
@ -131,7 +131,7 @@ public class NetworkServiceTests extends ESTestCase {
|
||||||
* ensure we can't bind to multiple addresses when using wildcard
|
* ensure we can't bind to multiple addresses when using wildcard
|
||||||
*/
|
*/
|
||||||
public void testBindMultipleAddressesWithWildcard() throws Exception {
|
public void testBindMultipleAddressesWithWildcard() throws Exception {
|
||||||
NetworkService service = new NetworkService(Settings.EMPTY, Collections.emptyList());
|
NetworkService service = new NetworkService(Collections.emptyList());
|
||||||
try {
|
try {
|
||||||
service.resolveBindHostAddresses(new String[]{"0.0.0.0", "127.0.0.1"});
|
service.resolveBindHostAddresses(new String[]{"0.0.0.0", "127.0.0.1"});
|
||||||
fail("should have hit exception");
|
fail("should have hit exception");
|
||||||
|
|
|
@ -140,7 +140,7 @@ public class ZenFaultDetectionTests extends ESTestCase {
|
||||||
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), singleton(TransportLivenessAction.NAME))
|
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), singleton(TransportLivenessAction.NAME))
|
||||||
.build(),
|
.build(),
|
||||||
new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, circuitBreakerService,
|
new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, circuitBreakerService,
|
||||||
namedWriteableRegistry, new NetworkService(settings, Collections.emptyList()), version),
|
namedWriteableRegistry, new NetworkService(Collections.emptyList()), version),
|
||||||
threadPool,
|
threadPool,
|
||||||
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
|
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
|
||||||
(boundAddress) ->
|
(boundAddress) ->
|
||||||
|
|
|
@ -49,13 +49,13 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.ConnectTransportException;
|
import org.elasticsearch.transport.ConnectTransportException;
|
||||||
import org.elasticsearch.transport.ConnectionProfile;
|
import org.elasticsearch.transport.ConnectionProfile;
|
||||||
import org.elasticsearch.transport.MockTcpTransport;
|
import org.elasticsearch.transport.MockTcpTransport;
|
||||||
|
import org.elasticsearch.transport.TcpTransport;
|
||||||
import org.elasticsearch.transport.Transport;
|
import org.elasticsearch.transport.Transport;
|
||||||
import org.elasticsearch.transport.TransportConnectionListener;
|
import org.elasticsearch.transport.TransportConnectionListener;
|
||||||
import org.elasticsearch.transport.TransportException;
|
import org.elasticsearch.transport.TransportException;
|
||||||
import org.elasticsearch.transport.TransportRequestOptions;
|
import org.elasticsearch.transport.TransportRequestOptions;
|
||||||
import org.elasticsearch.transport.TransportResponseHandler;
|
import org.elasticsearch.transport.TransportResponseHandler;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
import org.elasticsearch.transport.TransportSettings;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.mockito.Matchers;
|
import org.mockito.Matchers;
|
||||||
|
@ -137,11 +137,11 @@ public class UnicastZenPingTests extends ESTestCase {
|
||||||
|
|
||||||
public void testSimplePings() throws IOException, InterruptedException, ExecutionException {
|
public void testSimplePings() throws IOException, InterruptedException, ExecutionException {
|
||||||
// use ephemeral ports
|
// use ephemeral ports
|
||||||
final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build();
|
final Settings settings = Settings.builder().put("cluster.name", "test").put(TcpTransport.PORT.getKey(), 0).build();
|
||||||
final Settings settingsMismatch =
|
final Settings settingsMismatch =
|
||||||
Settings.builder().put(settings).put("cluster.name", "mismatch").put(TransportSettings.PORT.getKey(), 0).build();
|
Settings.builder().put(settings).put("cluster.name", "mismatch").put(TcpTransport.PORT.getKey(), 0).build();
|
||||||
|
|
||||||
NetworkService networkService = new NetworkService(settings, Collections.emptyList());
|
NetworkService networkService = new NetworkService(Collections.emptyList());
|
||||||
|
|
||||||
final BiFunction<Settings, Version, Transport> supplier = (s, v) -> new MockTcpTransport(
|
final BiFunction<Settings, Version, Transport> supplier = (s, v) -> new MockTcpTransport(
|
||||||
s,
|
s,
|
||||||
|
@ -262,9 +262,9 @@ public class UnicastZenPingTests extends ESTestCase {
|
||||||
|
|
||||||
public void testUnknownHostNotCached() throws ExecutionException, InterruptedException {
|
public void testUnknownHostNotCached() throws ExecutionException, InterruptedException {
|
||||||
// use ephemeral ports
|
// use ephemeral ports
|
||||||
final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build();
|
final Settings settings = Settings.builder().put("cluster.name", "test").put(TcpTransport.PORT.getKey(), 0).build();
|
||||||
|
|
||||||
final NetworkService networkService = new NetworkService(settings, Collections.emptyList());
|
final NetworkService networkService = new NetworkService(Collections.emptyList());
|
||||||
|
|
||||||
final Map<String, TransportAddress[]> addresses = new HashMap<>();
|
final Map<String, TransportAddress[]> addresses = new HashMap<>();
|
||||||
final BiFunction<Settings, Version, Transport> supplier = (s, v) -> new MockTcpTransport(
|
final BiFunction<Settings, Version, Transport> supplier = (s, v) -> new MockTcpTransport(
|
||||||
|
@ -370,7 +370,7 @@ public class UnicastZenPingTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testPortLimit() throws InterruptedException {
|
public void testPortLimit() throws InterruptedException {
|
||||||
final NetworkService networkService = new NetworkService(Settings.EMPTY, Collections.emptyList());
|
final NetworkService networkService = new NetworkService(Collections.emptyList());
|
||||||
final Transport transport = new MockTcpTransport(
|
final Transport transport = new MockTcpTransport(
|
||||||
Settings.EMPTY,
|
Settings.EMPTY,
|
||||||
threadPool,
|
threadPool,
|
||||||
|
@ -411,7 +411,7 @@ public class UnicastZenPingTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRemovingLocalAddresses() throws InterruptedException {
|
public void testRemovingLocalAddresses() throws InterruptedException {
|
||||||
final NetworkService networkService = new NetworkService(Settings.EMPTY, Collections.emptyList());
|
final NetworkService networkService = new NetworkService(Collections.emptyList());
|
||||||
final InetAddress loopbackAddress = InetAddress.getLoopbackAddress();
|
final InetAddress loopbackAddress = InetAddress.getLoopbackAddress();
|
||||||
final Transport transport = new MockTcpTransport(
|
final Transport transport = new MockTcpTransport(
|
||||||
Settings.EMPTY,
|
Settings.EMPTY,
|
||||||
|
@ -456,7 +456,7 @@ public class UnicastZenPingTests extends ESTestCase {
|
||||||
|
|
||||||
public void testUnknownHost() throws InterruptedException {
|
public void testUnknownHost() throws InterruptedException {
|
||||||
final Logger logger = mock(Logger.class);
|
final Logger logger = mock(Logger.class);
|
||||||
final NetworkService networkService = new NetworkService(Settings.EMPTY, Collections.emptyList());
|
final NetworkService networkService = new NetworkService(Collections.emptyList());
|
||||||
final String hostname = randomAlphaOfLength(8);
|
final String hostname = randomAlphaOfLength(8);
|
||||||
final UnknownHostException unknownHostException = new UnknownHostException(hostname);
|
final UnknownHostException unknownHostException = new UnknownHostException(hostname);
|
||||||
final Transport transport = new MockTcpTransport(
|
final Transport transport = new MockTcpTransport(
|
||||||
|
@ -504,7 +504,7 @@ public class UnicastZenPingTests extends ESTestCase {
|
||||||
|
|
||||||
public void testResolveTimeout() throws InterruptedException {
|
public void testResolveTimeout() throws InterruptedException {
|
||||||
final Logger logger = mock(Logger.class);
|
final Logger logger = mock(Logger.class);
|
||||||
final NetworkService networkService = new NetworkService(Settings.EMPTY, Collections.emptyList());
|
final NetworkService networkService = new NetworkService(Collections.emptyList());
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
final Transport transport = new MockTcpTransport(
|
final Transport transport = new MockTcpTransport(
|
||||||
Settings.EMPTY,
|
Settings.EMPTY,
|
||||||
|
@ -568,9 +568,9 @@ public class UnicastZenPingTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testResolveReuseExistingNodeConnections() throws ExecutionException, InterruptedException {
|
public void testResolveReuseExistingNodeConnections() throws ExecutionException, InterruptedException {
|
||||||
final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build();
|
final Settings settings = Settings.builder().put("cluster.name", "test").put(TcpTransport.PORT.getKey(), 0).build();
|
||||||
|
|
||||||
NetworkService networkService = new NetworkService(settings, Collections.emptyList());
|
NetworkService networkService = new NetworkService(Collections.emptyList());
|
||||||
|
|
||||||
final BiFunction<Settings, Version, Transport> supplier = (s, v) -> new MockTcpTransport(
|
final BiFunction<Settings, Version, Transport> supplier = (s, v) -> new MockTcpTransport(
|
||||||
s,
|
s,
|
||||||
|
@ -633,9 +633,9 @@ public class UnicastZenPingTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testPingingTemporalPings() throws ExecutionException, InterruptedException {
|
public void testPingingTemporalPings() throws ExecutionException, InterruptedException {
|
||||||
final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build();
|
final Settings settings = Settings.builder().put("cluster.name", "test").put(TcpTransport.PORT.getKey(), 0).build();
|
||||||
|
|
||||||
NetworkService networkService = new NetworkService(settings, Collections.emptyList());
|
NetworkService networkService = new NetworkService(Collections.emptyList());
|
||||||
|
|
||||||
final BiFunction<Settings, Version, Transport> supplier = (s, v) -> new MockTcpTransport(
|
final BiFunction<Settings, Version, Transport> supplier = (s, v) -> new MockTcpTransport(
|
||||||
s,
|
s,
|
||||||
|
@ -691,7 +691,7 @@ public class UnicastZenPingTests extends ESTestCase {
|
||||||
|
|
||||||
public void testInvalidHosts() throws InterruptedException {
|
public void testInvalidHosts() throws InterruptedException {
|
||||||
final Logger logger = mock(Logger.class);
|
final Logger logger = mock(Logger.class);
|
||||||
final NetworkService networkService = new NetworkService(Settings.EMPTY, Collections.emptyList());
|
final NetworkService networkService = new NetworkService(Collections.emptyList());
|
||||||
final Transport transport = new MockTcpTransport(
|
final Transport transport = new MockTcpTransport(
|
||||||
Settings.EMPTY,
|
Settings.EMPTY,
|
||||||
threadPool,
|
threadPool,
|
||||||
|
|
|
@ -79,7 +79,7 @@ public class DynamicMappingDisabledTests extends ESSingleNodeTestCase {
|
||||||
clusterService = createClusterService(threadPool);
|
clusterService = createClusterService(threadPool);
|
||||||
Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
||||||
new NoneCircuitBreakerService(), new NamedWriteableRegistry(Collections.emptyList()),
|
new NoneCircuitBreakerService(), new NamedWriteableRegistry(Collections.emptyList()),
|
||||||
new NetworkService(settings, Collections.emptyList()));
|
new NetworkService(Collections.emptyList()));
|
||||||
transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
|
transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
|
||||||
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
|
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
|
||||||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||||
|
|
|
@ -46,11 +46,11 @@ public class PublishPortTests extends ESTestCase {
|
||||||
final Settings profileSettings;
|
final Settings profileSettings;
|
||||||
if (useProfile) {
|
if (useProfile) {
|
||||||
profile = "some_profile";
|
profile = "some_profile";
|
||||||
settings = randomBoolean() ? Settings.EMPTY : Settings.builder().put(TransportSettings.PUBLISH_PORT.getKey(), 9081).build();
|
settings = randomBoolean() ? Settings.EMPTY : Settings.builder().put(TcpTransport.PUBLISH_PORT.getKey(), 9081).build();
|
||||||
profileSettings = Settings.builder().put("publish_port", 9080).build();
|
profileSettings = Settings.builder().put("publish_port", 9080).build();
|
||||||
} else {
|
} else {
|
||||||
profile = TransportSettings.DEFAULT_PROFILE;
|
profile = TcpTransport.DEFAULT_PROFILE;
|
||||||
settings = Settings.builder().put(TransportSettings.PUBLISH_PORT.getKey(), 9081).build();
|
settings = Settings.builder().put(TcpTransport.PUBLISH_PORT.getKey(), 9081).build();
|
||||||
profileSettings = randomBoolean() ? Settings.EMPTY : Settings.builder().put("publish_port", 9080).build();;
|
profileSettings = randomBoolean() ? Settings.EMPTY : Settings.builder().put("publish_port", 9080).build();;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -65,7 +65,7 @@ public class TransportServiceHandshakeTests extends ESTestCase {
|
||||||
BigArrays.NON_RECYCLING_INSTANCE,
|
BigArrays.NON_RECYCLING_INSTANCE,
|
||||||
new NoneCircuitBreakerService(),
|
new NoneCircuitBreakerService(),
|
||||||
new NamedWriteableRegistry(Collections.emptyList()),
|
new NamedWriteableRegistry(Collections.emptyList()),
|
||||||
new NetworkService(settings, Collections.emptyList()));
|
new NetworkService(Collections.emptyList()));
|
||||||
TransportService transportService = new MockTransportService(settings, transport, threadPool,
|
TransportService transportService = new MockTransportService(settings, transport, threadPool,
|
||||||
TransportService.NOOP_TRANSPORT_INTERCEPTOR, (boundAddress) -> new DiscoveryNode(
|
TransportService.NOOP_TRANSPORT_INTERCEPTOR, (boundAddress) -> new DiscoveryNode(
|
||||||
nodeNameAndId,
|
nodeNameAndId,
|
||||||
|
|
|
@ -128,16 +128,16 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
|
||||||
(s) -> Setting.parseInt(s, 1, "http.netty.worker_count"), Property.NodeScope);
|
(s) -> Setting.parseInt(s, 1, "http.netty.worker_count"), Property.NodeScope);
|
||||||
|
|
||||||
public static final Setting<Boolean> SETTING_HTTP_TCP_NO_DELAY =
|
public static final Setting<Boolean> SETTING_HTTP_TCP_NO_DELAY =
|
||||||
boolSetting("http.tcp_no_delay", NetworkService.TcpSettings.TCP_NO_DELAY, Property.NodeScope);
|
boolSetting("http.tcp_no_delay", NetworkService.TCP_NO_DELAY, Property.NodeScope);
|
||||||
public static final Setting<Boolean> SETTING_HTTP_TCP_KEEP_ALIVE =
|
public static final Setting<Boolean> SETTING_HTTP_TCP_KEEP_ALIVE =
|
||||||
boolSetting("http.tcp.keep_alive", NetworkService.TcpSettings.TCP_KEEP_ALIVE, Property.NodeScope);
|
boolSetting("http.tcp.keep_alive", NetworkService.TCP_KEEP_ALIVE, Property.NodeScope);
|
||||||
public static final Setting<Boolean> SETTING_HTTP_TCP_REUSE_ADDRESS =
|
public static final Setting<Boolean> SETTING_HTTP_TCP_REUSE_ADDRESS =
|
||||||
boolSetting("http.tcp.reuse_address", NetworkService.TcpSettings.TCP_REUSE_ADDRESS, Property.NodeScope);
|
boolSetting("http.tcp.reuse_address", NetworkService.TCP_REUSE_ADDRESS, Property.NodeScope);
|
||||||
|
|
||||||
public static final Setting<ByteSizeValue> SETTING_HTTP_TCP_SEND_BUFFER_SIZE =
|
public static final Setting<ByteSizeValue> SETTING_HTTP_TCP_SEND_BUFFER_SIZE =
|
||||||
Setting.byteSizeSetting("http.tcp.send_buffer_size", NetworkService.TcpSettings.TCP_SEND_BUFFER_SIZE, Property.NodeScope);
|
Setting.byteSizeSetting("http.tcp.send_buffer_size", NetworkService.TCP_SEND_BUFFER_SIZE, Property.NodeScope);
|
||||||
public static final Setting<ByteSizeValue> SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE =
|
public static final Setting<ByteSizeValue> SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE =
|
||||||
Setting.byteSizeSetting("http.tcp.receive_buffer_size", NetworkService.TcpSettings.TCP_RECEIVE_BUFFER_SIZE, Property.NodeScope);
|
Setting.byteSizeSetting("http.tcp.receive_buffer_size", NetworkService.TCP_RECEIVE_BUFFER_SIZE, Property.NodeScope);
|
||||||
public static final Setting<ByteSizeValue> SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE =
|
public static final Setting<ByteSizeValue> SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE =
|
||||||
Setting.byteSizeSetting("http.netty.receive_predictor_size", new ByteSizeValue(64, ByteSizeUnit.KB), Property.NodeScope);
|
Setting.byteSizeSetting("http.netty.receive_predictor_size", new ByteSizeValue(64, ByteSizeUnit.KB), Property.NodeScope);
|
||||||
public static final Setting<ByteSizeValue> SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_MIN =
|
public static final Setting<ByteSizeValue> SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_MIN =
|
||||||
|
@ -222,8 +222,14 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
|
||||||
this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings);
|
this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings);
|
||||||
this.workerCount = SETTING_HTTP_WORKER_COUNT.get(settings);
|
this.workerCount = SETTING_HTTP_WORKER_COUNT.get(settings);
|
||||||
this.port = SETTING_HTTP_PORT.get(settings);
|
this.port = SETTING_HTTP_PORT.get(settings);
|
||||||
this.bindHosts = SETTING_HTTP_BIND_HOST.get(settings).toArray(Strings.EMPTY_ARRAY);
|
// we can't make the network.bind_host a fallback since we already fall back to http.host hence the extra conditional here
|
||||||
this.publishHosts = SETTING_HTTP_PUBLISH_HOST.get(settings).toArray(Strings.EMPTY_ARRAY);
|
List<String> httpBindHost = SETTING_HTTP_BIND_HOST.get(settings);
|
||||||
|
this.bindHosts = (httpBindHost.isEmpty() ? NetworkService.GLOBAL_NETWORK_BINDHOST_SETTING.get(settings) : httpBindHost)
|
||||||
|
.toArray(Strings.EMPTY_ARRAY);
|
||||||
|
// we can't make the network.publish_host a fallback since we already fall back to http.host hence the extra conditional here
|
||||||
|
List<String> httpPublishHost = SETTING_HTTP_PUBLISH_HOST.get(settings);
|
||||||
|
this.publishHosts = (httpPublishHost.isEmpty() ? NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING.get(settings) : httpPublishHost)
|
||||||
|
.toArray(Strings.EMPTY_ARRAY);
|
||||||
this.tcpNoDelay = SETTING_HTTP_TCP_NO_DELAY.get(settings);
|
this.tcpNoDelay = SETTING_HTTP_TCP_NO_DELAY.get(settings);
|
||||||
this.tcpKeepAlive = SETTING_HTTP_TCP_KEEP_ALIVE.get(settings);
|
this.tcpKeepAlive = SETTING_HTTP_TCP_KEEP_ALIVE.get(settings);
|
||||||
this.reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);
|
this.reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);
|
||||||
|
|
|
@ -47,7 +47,6 @@ import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||||
import org.elasticsearch.common.lease.Releasables;
|
import org.elasticsearch.common.lease.Releasables;
|
||||||
import org.elasticsearch.common.network.NetworkService;
|
import org.elasticsearch.common.network.NetworkService;
|
||||||
import org.elasticsearch.common.network.NetworkService.TcpSettings;
|
|
||||||
import org.elasticsearch.common.settings.Setting;
|
import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.common.settings.Setting.Property;
|
import org.elasticsearch.common.settings.Setting.Property;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
@ -64,7 +63,6 @@ import org.elasticsearch.transport.ConnectionProfile;
|
||||||
import org.elasticsearch.transport.TcpTransport;
|
import org.elasticsearch.transport.TcpTransport;
|
||||||
import org.elasticsearch.transport.TransportRequestOptions;
|
import org.elasticsearch.transport.TransportRequestOptions;
|
||||||
import org.elasticsearch.transport.TransportServiceAdapter;
|
import org.elasticsearch.transport.TransportServiceAdapter;
|
||||||
import org.elasticsearch.transport.TransportSettings;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
@ -209,23 +207,23 @@ public class Netty4Transport extends TcpTransport<Channel> {
|
||||||
private Settings createFallbackSettings() {
|
private Settings createFallbackSettings() {
|
||||||
Settings.Builder fallbackSettingsBuilder = Settings.builder();
|
Settings.Builder fallbackSettingsBuilder = Settings.builder();
|
||||||
|
|
||||||
List<String> fallbackBindHost = TransportSettings.BIND_HOST.get(settings);
|
List<String> fallbackBindHost = TcpTransport.BIND_HOST.get(settings);
|
||||||
if (fallbackBindHost.isEmpty() == false) {
|
if (fallbackBindHost.isEmpty() == false) {
|
||||||
fallbackSettingsBuilder.putArray("bind_host", fallbackBindHost);
|
fallbackSettingsBuilder.putArray("bind_host", fallbackBindHost);
|
||||||
}
|
}
|
||||||
|
|
||||||
List<String> fallbackPublishHost = TransportSettings.PUBLISH_HOST.get(settings);
|
List<String> fallbackPublishHost = TcpTransport.PUBLISH_HOST.get(settings);
|
||||||
if (fallbackPublishHost.isEmpty() == false) {
|
if (fallbackPublishHost.isEmpty() == false) {
|
||||||
fallbackSettingsBuilder.putArray("publish_host", fallbackPublishHost);
|
fallbackSettingsBuilder.putArray("publish_host", fallbackPublishHost);
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean fallbackTcpNoDelay = TcpSettings.TCP_NO_DELAY.get(settings);
|
boolean fallbackTcpNoDelay = TcpTransport.TCP_NO_DELAY.get(settings);
|
||||||
fallbackSettingsBuilder.put("tcp_no_delay", fallbackTcpNoDelay);
|
fallbackSettingsBuilder.put("tcp_no_delay", fallbackTcpNoDelay);
|
||||||
|
|
||||||
boolean fallbackTcpKeepAlive = TcpSettings.TCP_KEEP_ALIVE.get(settings);
|
boolean fallbackTcpKeepAlive = TcpTransport.TCP_KEEP_ALIVE.get(settings);
|
||||||
fallbackSettingsBuilder.put("tcp_keep_alive", fallbackTcpKeepAlive);
|
fallbackSettingsBuilder.put("tcp_keep_alive", fallbackTcpKeepAlive);
|
||||||
|
|
||||||
boolean fallbackReuseAddress = TcpSettings.TCP_REUSE_ADDRESS.get(settings);
|
boolean fallbackReuseAddress = TcpTransport.TCP_REUSE_ADDRESS.get(settings);
|
||||||
fallbackSettingsBuilder.put("reuse_address", fallbackReuseAddress);
|
fallbackSettingsBuilder.put("reuse_address", fallbackReuseAddress);
|
||||||
|
|
||||||
ByteSizeValue fallbackTcpSendBufferSize = TCP_SEND_BUFFER_SIZE.get(settings);
|
ByteSizeValue fallbackTcpSendBufferSize = TCP_SEND_BUFFER_SIZE.get(settings);
|
||||||
|
|
|
@ -97,7 +97,7 @@ public class Netty4HttpChannelTests extends ESTestCase {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
networkService = new NetworkService(Settings.EMPTY, Collections.emptyList());
|
networkService = new NetworkService(Collections.emptyList());
|
||||||
threadPool = new TestThreadPool("test");
|
threadPool = new TestThreadPool("test");
|
||||||
bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
|
bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,7 +72,7 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
networkService = new NetworkService(Settings.EMPTY, Collections.emptyList());
|
networkService = new NetworkService(Collections.emptyList());
|
||||||
threadPool = new TestThreadPool("test");
|
threadPool = new TestThreadPool("test");
|
||||||
bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
|
bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,7 +91,7 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
networkService = new NetworkService(Settings.EMPTY, Collections.emptyList());
|
networkService = new NetworkService(Collections.emptyList());
|
||||||
threadPool = new TestThreadPool("test");
|
threadPool = new TestThreadPool("test");
|
||||||
bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
|
bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,7 +40,6 @@ import org.elasticsearch.transport.TransportResponse;
|
||||||
import org.elasticsearch.transport.TransportResponseHandler;
|
import org.elasticsearch.transport.TransportResponseHandler;
|
||||||
import org.elasticsearch.transport.TransportResponseOptions;
|
import org.elasticsearch.transport.TransportResponseOptions;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
import org.elasticsearch.transport.TransportSettings;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -54,21 +53,21 @@ public class Netty4ScheduledPingTests extends ESTestCase {
|
||||||
|
|
||||||
Settings settings = Settings.builder()
|
Settings settings = Settings.builder()
|
||||||
.put(TcpTransport.PING_SCHEDULE.getKey(), "5ms")
|
.put(TcpTransport.PING_SCHEDULE.getKey(), "5ms")
|
||||||
.put(TransportSettings.PORT.getKey(), 0)
|
.put(TcpTransport.PORT.getKey(), 0)
|
||||||
.put("cluster.name", "test")
|
.put("cluster.name", "test")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();
|
CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();
|
||||||
|
|
||||||
NamedWriteableRegistry registry = new NamedWriteableRegistry(Collections.emptyList());
|
NamedWriteableRegistry registry = new NamedWriteableRegistry(Collections.emptyList());
|
||||||
final Netty4Transport nettyA = new Netty4Transport(settings, threadPool, new NetworkService(settings, Collections.emptyList()),
|
final Netty4Transport nettyA = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
|
||||||
BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
|
BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
|
||||||
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
|
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
|
||||||
null);
|
null);
|
||||||
serviceA.start();
|
serviceA.start();
|
||||||
serviceA.acceptIncomingRequests();
|
serviceA.acceptIncomingRequests();
|
||||||
|
|
||||||
final Netty4Transport nettyB = new Netty4Transport(settings, threadPool, new NetworkService(settings, Collections.emptyList()),
|
final Netty4Transport nettyB = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
|
||||||
BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
|
BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
|
||||||
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
|
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
|
||||||
null);
|
null);
|
||||||
|
|
|
@ -29,7 +29,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||||
import org.elasticsearch.mocksocket.MockSocket;
|
import org.elasticsearch.mocksocket.MockSocket;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportSettings;
|
import org.elasticsearch.transport.TcpTransport;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
|
@ -50,8 +50,8 @@ public class Netty4SizeHeaderFrameDecoderTests extends ESTestCase {
|
||||||
|
|
||||||
private final Settings settings = Settings.builder()
|
private final Settings settings = Settings.builder()
|
||||||
.put("node.name", "NettySizeHeaderFrameDecoderTests")
|
.put("node.name", "NettySizeHeaderFrameDecoderTests")
|
||||||
.put(TransportSettings.BIND_HOST.getKey(), "127.0.0.1")
|
.put(TcpTransport.BIND_HOST.getKey(), "127.0.0.1")
|
||||||
.put(TransportSettings.PORT.getKey(), "0")
|
.put(TcpTransport.PORT.getKey(), "0")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
private ThreadPool threadPool;
|
private ThreadPool threadPool;
|
||||||
|
@ -62,7 +62,7 @@ public class Netty4SizeHeaderFrameDecoderTests extends ESTestCase {
|
||||||
@Before
|
@Before
|
||||||
public void startThreadPool() {
|
public void startThreadPool() {
|
||||||
threadPool = new ThreadPool(settings);
|
threadPool = new ThreadPool(settings);
|
||||||
NetworkService networkService = new NetworkService(settings, Collections.emptyList());
|
NetworkService networkService = new NetworkService(Collections.emptyList());
|
||||||
BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
|
BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
|
||||||
nettyTransport = new Netty4Transport(settings, threadPool, networkService, bigArrays,
|
nettyTransport = new Netty4Transport(settings, threadPool, networkService, bigArrays,
|
||||||
new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
|
new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
|
||||||
|
|
|
@ -37,8 +37,8 @@ import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
import org.elasticsearch.transport.TcpTransport;
|
||||||
import org.elasticsearch.transport.Transport;
|
import org.elasticsearch.transport.Transport;
|
||||||
import org.elasticsearch.transport.TransportSettings;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
@ -80,7 +80,7 @@ public class Netty4TransportIT extends ESNetty4IntegTestCase {
|
||||||
fail("Expected exception, but didn't happen");
|
fail("Expected exception, but didn't happen");
|
||||||
} catch (ElasticsearchException e) {
|
} catch (ElasticsearchException e) {
|
||||||
assertThat(e.getMessage(), containsString("MY MESSAGE"));
|
assertThat(e.getMessage(), containsString("MY MESSAGE"));
|
||||||
assertThat(channelProfileName, is(TransportSettings.DEFAULT_PROFILE));
|
assertThat(channelProfileName, is(TcpTransport.DEFAULT_PROFILE));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -114,7 +114,7 @@ public class Netty4TransportIT extends ESNetty4IntegTestCase {
|
||||||
InetSocketAddress remoteAddress, byte status) throws IOException {
|
InetSocketAddress remoteAddress, byte status) throws IOException {
|
||||||
String action = super.handleRequest(channel, profileName, stream, requestId, messageLengthBytes, version,
|
String action = super.handleRequest(channel, profileName, stream, requestId, messageLengthBytes, version,
|
||||||
remoteAddress, status);
|
remoteAddress, status);
|
||||||
channelProfileName = TransportSettings.DEFAULT_PROFILE;
|
channelProfileName = TcpTransport.DEFAULT_PROFILE;
|
||||||
return action;
|
return action;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,6 @@ import org.elasticsearch.threadpool.TestThreadPool;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TcpTransport;
|
import org.elasticsearch.transport.TcpTransport;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
import org.elasticsearch.transport.TransportSettings;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -54,7 +53,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
||||||
public void testThatNettyCanBindToMultiplePorts() throws Exception {
|
public void testThatNettyCanBindToMultiplePorts() throws Exception {
|
||||||
Settings settings = Settings.builder()
|
Settings settings = Settings.builder()
|
||||||
.put("network.host", host)
|
.put("network.host", host)
|
||||||
.put(TransportSettings.PORT.getKey(), 22) // will not actually bind to this
|
.put(TcpTransport.PORT.getKey(), 22) // will not actually bind to this
|
||||||
.put("transport.profiles.default.port", 0)
|
.put("transport.profiles.default.port", 0)
|
||||||
.put("transport.profiles.client1.port", 0)
|
.put("transport.profiles.client1.port", 0)
|
||||||
.build();
|
.build();
|
||||||
|
@ -71,7 +70,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
||||||
public void testThatDefaultProfileInheritsFromStandardSettings() throws Exception {
|
public void testThatDefaultProfileInheritsFromStandardSettings() throws Exception {
|
||||||
Settings settings = Settings.builder()
|
Settings settings = Settings.builder()
|
||||||
.put("network.host", host)
|
.put("network.host", host)
|
||||||
.put(TransportSettings.PORT.getKey(), 0)
|
.put(TcpTransport.PORT.getKey(), 0)
|
||||||
.put("transport.profiles.client1.port", 0)
|
.put("transport.profiles.client1.port", 0)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
@ -88,7 +87,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
||||||
|
|
||||||
Settings settings = Settings.builder()
|
Settings settings = Settings.builder()
|
||||||
.put("network.host", host)
|
.put("network.host", host)
|
||||||
.put(TransportSettings.PORT.getKey(), 0)
|
.put(TcpTransport.PORT.getKey(), 0)
|
||||||
.put("transport.profiles.client1.whatever", "foo")
|
.put("transport.profiles.client1.whatever", "foo")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
@ -104,7 +103,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
||||||
public void testThatDefaultProfilePortOverridesGeneralConfiguration() throws Exception {
|
public void testThatDefaultProfilePortOverridesGeneralConfiguration() throws Exception {
|
||||||
Settings settings = Settings.builder()
|
Settings settings = Settings.builder()
|
||||||
.put("network.host", host)
|
.put("network.host", host)
|
||||||
.put(TransportSettings.PORT.getKey(), 22) // will not actually bind to this
|
.put(TcpTransport.PORT.getKey(), 22) // will not actually bind to this
|
||||||
.put("transport.profiles.default.port", 0)
|
.put("transport.profiles.default.port", 0)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
@ -120,7 +119,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
||||||
public void testThatProfileWithoutValidNameIsIgnored() throws Exception {
|
public void testThatProfileWithoutValidNameIsIgnored() throws Exception {
|
||||||
Settings settings = Settings.builder()
|
Settings settings = Settings.builder()
|
||||||
.put("network.host", host)
|
.put("network.host", host)
|
||||||
.put(TransportSettings.PORT.getKey(), 0)
|
.put(TcpTransport.PORT.getKey(), 0)
|
||||||
// mimics someone trying to define a profile for .local which is the profile for a node request to itself
|
// mimics someone trying to define a profile for .local which is the profile for a node request to itself
|
||||||
.put("transport.profiles." + TransportService.DIRECT_RESPONSE_PROFILE + ".port", 22) // will not actually bind to this
|
.put("transport.profiles." + TransportService.DIRECT_RESPONSE_PROFILE + ".port", 22) // will not actually bind to this
|
||||||
.put("transport.profiles..port", 23) // will not actually bind to this
|
.put("transport.profiles..port", 23) // will not actually bind to this
|
||||||
|
@ -137,7 +136,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
||||||
|
|
||||||
private TcpTransport<?> startTransport(Settings settings, ThreadPool threadPool) {
|
private TcpTransport<?> startTransport(Settings settings, ThreadPool threadPool) {
|
||||||
BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
|
BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
|
||||||
TcpTransport<?> transport = new Netty4Transport(settings, threadPool, new NetworkService(settings, Collections.emptyList()),
|
TcpTransport<?> transport = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
|
||||||
bigArrays, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
|
bigArrays, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
|
||||||
transport.start();
|
transport.start();
|
||||||
|
|
||||||
|
|
|
@ -36,9 +36,9 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
|
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
|
||||||
import org.elasticsearch.transport.BindTransportException;
|
import org.elasticsearch.transport.BindTransportException;
|
||||||
import org.elasticsearch.transport.ConnectTransportException;
|
import org.elasticsearch.transport.ConnectTransportException;
|
||||||
|
import org.elasticsearch.transport.TcpTransport;
|
||||||
import org.elasticsearch.transport.Transport;
|
import org.elasticsearch.transport.Transport;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
import org.elasticsearch.transport.TransportSettings;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
|
@ -54,7 +54,7 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase
|
||||||
public static MockTransportService nettyFromThreadPool(Settings settings, ThreadPool threadPool, final Version version,
|
public static MockTransportService nettyFromThreadPool(Settings settings, ThreadPool threadPool, final Version version,
|
||||||
ClusterSettings clusterSettings, boolean doHandshake) {
|
ClusterSettings clusterSettings, boolean doHandshake) {
|
||||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
||||||
Transport transport = new Netty4Transport(settings, threadPool, new NetworkService(settings, Collections.emptyList()),
|
Transport transport = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
|
||||||
BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {
|
BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -80,7 +80,7 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) {
|
protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) {
|
||||||
settings = Settings.builder().put(settings).put(TransportSettings.PORT.getKey(), "0").build();
|
settings = Settings.builder().put(settings).put(TcpTransport.PORT.getKey(), "0").build();
|
||||||
MockTransportService transportService = nettyFromThreadPool(settings, threadPool, version, clusterSettings, doHandshake);
|
MockTransportService transportService = nettyFromThreadPool(settings, threadPool, version, clusterSettings, doHandshake);
|
||||||
transportService.start();
|
transportService.start();
|
||||||
return transportService;
|
return transportService;
|
||||||
|
|
|
@ -32,7 +32,6 @@ import org.elasticsearch.cloud.azure.classic.management.AzureComputeService.Disc
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
|
||||||
import org.elasticsearch.common.network.InetAddresses;
|
import org.elasticsearch.common.network.InetAddresses;
|
||||||
import org.elasticsearch.common.network.NetworkAddress;
|
import org.elasticsearch.common.network.NetworkAddress;
|
||||||
import org.elasticsearch.common.network.NetworkService;
|
import org.elasticsearch.common.network.NetworkService;
|
||||||
|
@ -162,7 +161,8 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
|
||||||
|
|
||||||
InetAddress ipAddress = null;
|
InetAddress ipAddress = null;
|
||||||
try {
|
try {
|
||||||
ipAddress = networkService.resolvePublishHostAddresses(null);
|
ipAddress = networkService.resolvePublishHostAddresses(
|
||||||
|
NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY));
|
||||||
logger.trace("ip of current node: [{}]", ipAddress);
|
logger.trace("ip of current node: [{}]", ipAddress);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// We can't find the publish host address... Hmmm. Too bad :-(
|
// We can't find the publish host address... Hmmm. Too bad :-(
|
||||||
|
|
|
@ -37,7 +37,7 @@ import org.elasticsearch.node.Node;
|
||||||
import org.elasticsearch.plugin.discovery.azure.classic.AzureDiscoveryPlugin;
|
import org.elasticsearch.plugin.discovery.azure.classic.AzureDiscoveryPlugin;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
import org.elasticsearch.transport.TransportSettings;
|
import org.elasticsearch.transport.TcpTransport;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
|
||||||
|
@ -108,7 +108,7 @@ public class AzureDiscoveryClusterFormationTests extends ESIntegTestCase {
|
||||||
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
|
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
|
||||||
.put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), AzureDiscoveryPlugin.AZURE)
|
.put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), AzureDiscoveryPlugin.AZURE)
|
||||||
.put(Environment.PATH_LOGS_SETTING.getKey(), resolve)
|
.put(Environment.PATH_LOGS_SETTING.getKey(), resolve)
|
||||||
.put(TransportSettings.PORT.getKey(), 0)
|
.put(TcpTransport.PORT.getKey(), 0)
|
||||||
.put(Node.WRITE_PORTS_FILE_SETTING.getKey(), "true")
|
.put(Node.WRITE_PORTS_FILE_SETTING.getKey(), "true")
|
||||||
.put(AzureComputeService.Management.ENDPOINT_SETTING.getKey(), "https://" + InetAddress.getLoopbackAddress().getHostAddress() +
|
.put(AzureComputeService.Management.ENDPOINT_SETTING.getKey(), "https://" + InetAddress.getLoopbackAddress().getHostAddress() +
|
||||||
":" + httpsServer.getAddress().getPort())
|
":" + httpsServer.getAddress().getPort())
|
||||||
|
|
|
@ -74,7 +74,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
||||||
public void createTransportService() {
|
public void createTransportService() {
|
||||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
||||||
final Transport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
final Transport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
||||||
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Settings.EMPTY, Collections.emptyList()),
|
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()),
|
||||||
Version.CURRENT) {
|
Version.CURRENT) {
|
||||||
@Override
|
@Override
|
||||||
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
|
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
|
||||||
|
|
|
@ -42,7 +42,7 @@ public class Ec2NetworkTests extends ESTestCase {
|
||||||
.put("network.host", "_ec2_")
|
.put("network.host", "_ec2_")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
NetworkService networkService = new NetworkService(nodeSettings, Collections.singletonList(new Ec2NameResolver(nodeSettings)));
|
NetworkService networkService = new NetworkService(Collections.singletonList(new Ec2NameResolver(nodeSettings)));
|
||||||
// TODO we need to replace that with a mock. For now we check the URL we are supposed to reach.
|
// TODO we need to replace that with a mock. For now we check the URL we are supposed to reach.
|
||||||
try {
|
try {
|
||||||
networkService.resolveBindHostAddresses(null);
|
networkService.resolveBindHostAddresses(null);
|
||||||
|
@ -59,7 +59,7 @@ public class Ec2NetworkTests extends ESTestCase {
|
||||||
.put("network.host", "_ec2:publicIp_")
|
.put("network.host", "_ec2:publicIp_")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
NetworkService networkService = new NetworkService(nodeSettings, Collections.singletonList(new Ec2NameResolver(nodeSettings)));
|
NetworkService networkService = new NetworkService(Collections.singletonList(new Ec2NameResolver(nodeSettings)));
|
||||||
// TODO we need to replace that with a mock. For now we check the URL we are supposed to reach.
|
// TODO we need to replace that with a mock. For now we check the URL we are supposed to reach.
|
||||||
try {
|
try {
|
||||||
networkService.resolveBindHostAddresses(null);
|
networkService.resolveBindHostAddresses(null);
|
||||||
|
@ -76,7 +76,7 @@ public class Ec2NetworkTests extends ESTestCase {
|
||||||
.put("network.host", "_ec2:privateIp_")
|
.put("network.host", "_ec2:privateIp_")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
NetworkService networkService = new NetworkService(nodeSettings, Collections.singletonList(new Ec2NameResolver(nodeSettings)));
|
NetworkService networkService = new NetworkService(Collections.singletonList(new Ec2NameResolver(nodeSettings)));
|
||||||
// TODO we need to replace that with a mock. For now we check the URL we are supposed to reach.
|
// TODO we need to replace that with a mock. For now we check the URL we are supposed to reach.
|
||||||
try {
|
try {
|
||||||
networkService.resolveBindHostAddresses(null);
|
networkService.resolveBindHostAddresses(null);
|
||||||
|
@ -93,7 +93,7 @@ public class Ec2NetworkTests extends ESTestCase {
|
||||||
.put("network.host", "_ec2:privateIpv4_")
|
.put("network.host", "_ec2:privateIpv4_")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
NetworkService networkService = new NetworkService(nodeSettings, Collections.singletonList(new Ec2NameResolver(nodeSettings)));
|
NetworkService networkService = new NetworkService(Collections.singletonList(new Ec2NameResolver(nodeSettings)));
|
||||||
// TODO we need to replace that with a mock. For now we check the URL we are supposed to reach.
|
// TODO we need to replace that with a mock. For now we check the URL we are supposed to reach.
|
||||||
try {
|
try {
|
||||||
networkService.resolveBindHostAddresses(null);
|
networkService.resolveBindHostAddresses(null);
|
||||||
|
@ -110,7 +110,7 @@ public class Ec2NetworkTests extends ESTestCase {
|
||||||
.put("network.host", "_ec2:privateDns_")
|
.put("network.host", "_ec2:privateDns_")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
NetworkService networkService = new NetworkService(nodeSettings, Collections.singletonList(new Ec2NameResolver(nodeSettings)));
|
NetworkService networkService = new NetworkService(Collections.singletonList(new Ec2NameResolver(nodeSettings)));
|
||||||
// TODO we need to replace that with a mock. For now we check the URL we are supposed to reach.
|
// TODO we need to replace that with a mock. For now we check the URL we are supposed to reach.
|
||||||
try {
|
try {
|
||||||
networkService.resolveBindHostAddresses(null);
|
networkService.resolveBindHostAddresses(null);
|
||||||
|
@ -127,7 +127,7 @@ public class Ec2NetworkTests extends ESTestCase {
|
||||||
.put("network.host", "_ec2:publicIpv4_")
|
.put("network.host", "_ec2:publicIpv4_")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
NetworkService networkService = new NetworkService(nodeSettings, Collections.singletonList(new Ec2NameResolver(nodeSettings)));
|
NetworkService networkService = new NetworkService(Collections.singletonList(new Ec2NameResolver(nodeSettings)));
|
||||||
// TODO we need to replace that with a mock. For now we check the URL we are supposed to reach.
|
// TODO we need to replace that with a mock. For now we check the URL we are supposed to reach.
|
||||||
try {
|
try {
|
||||||
networkService.resolveBindHostAddresses(null);
|
networkService.resolveBindHostAddresses(null);
|
||||||
|
@ -144,7 +144,7 @@ public class Ec2NetworkTests extends ESTestCase {
|
||||||
.put("network.host", "_ec2:publicDns_")
|
.put("network.host", "_ec2:publicDns_")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
NetworkService networkService = new NetworkService(nodeSettings, Collections.singletonList(new Ec2NameResolver(nodeSettings)));
|
NetworkService networkService = new NetworkService(Collections.singletonList(new Ec2NameResolver(nodeSettings)));
|
||||||
// TODO we need to replace that with a mock. For now we check the URL we are supposed to reach.
|
// TODO we need to replace that with a mock. For now we check the URL we are supposed to reach.
|
||||||
try {
|
try {
|
||||||
networkService.resolveBindHostAddresses(null);
|
networkService.resolveBindHostAddresses(null);
|
||||||
|
@ -162,7 +162,7 @@ public class Ec2NetworkTests extends ESTestCase {
|
||||||
.put("network.host", "_local_")
|
.put("network.host", "_local_")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
NetworkService networkService = new NetworkService(nodeSettings, Collections.singletonList(new Ec2NameResolver(nodeSettings)));
|
NetworkService networkService = new NetworkService(Collections.singletonList(new Ec2NameResolver(nodeSettings)));
|
||||||
InetAddress[] addresses = networkService.resolveBindHostAddresses(null);
|
InetAddress[] addresses = networkService.resolveBindHostAddresses(null);
|
||||||
assertThat(addresses, arrayContaining(networkService.resolveBindHostAddresses(new String[] { "_local_" })));
|
assertThat(addresses, arrayContaining(networkService.resolveBindHostAddresses(new String[] { "_local_" })));
|
||||||
}
|
}
|
||||||
|
|
|
@ -88,7 +88,7 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase {
|
||||||
BigArrays.NON_RECYCLING_INSTANCE,
|
BigArrays.NON_RECYCLING_INSTANCE,
|
||||||
new NoneCircuitBreakerService(),
|
new NoneCircuitBreakerService(),
|
||||||
new NamedWriteableRegistry(Collections.emptyList()),
|
new NamedWriteableRegistry(Collections.emptyList()),
|
||||||
new NetworkService(Settings.EMPTY, Collections.emptyList())) {
|
new NetworkService(Collections.emptyList())) {
|
||||||
@Override
|
@Override
|
||||||
public BoundTransportAddress boundAddress() {
|
public BoundTransportAddress boundAddress() {
|
||||||
return new BoundTransportAddress(
|
return new BoundTransportAddress(
|
||||||
|
|
|
@ -118,7 +118,8 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas
|
||||||
cachedDiscoNodes = new ArrayList<>();
|
cachedDiscoNodes = new ArrayList<>();
|
||||||
String ipAddress = null;
|
String ipAddress = null;
|
||||||
try {
|
try {
|
||||||
InetAddress inetAddress = networkService.resolvePublishHostAddresses(null);
|
InetAddress inetAddress = networkService.resolvePublishHostAddresses(
|
||||||
|
NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY));
|
||||||
if (inetAddress != null) {
|
if (inetAddress != null) {
|
||||||
ipAddress = NetworkAddress.format(inetAddress);
|
ipAddress = NetworkAddress.format(inetAddress);
|
||||||
}
|
}
|
||||||
|
|
|
@ -107,7 +107,7 @@ public class GceDiscoveryTests extends ESTestCase {
|
||||||
|
|
||||||
protected List<DiscoveryNode> buildDynamicNodes(GceInstancesServiceImpl gceInstancesService, Settings nodeSettings) {
|
protected List<DiscoveryNode> buildDynamicNodes(GceInstancesServiceImpl gceInstancesService, Settings nodeSettings) {
|
||||||
GceUnicastHostsProvider provider = new GceUnicastHostsProvider(nodeSettings, gceInstancesService,
|
GceUnicastHostsProvider provider = new GceUnicastHostsProvider(nodeSettings, gceInstancesService,
|
||||||
transportService, new NetworkService(Settings.EMPTY, Collections.emptyList()));
|
transportService, new NetworkService(Collections.emptyList()));
|
||||||
|
|
||||||
List<DiscoveryNode> discoveryNodes = provider.buildDynamicNodes();
|
List<DiscoveryNode> discoveryNodes = provider.buildDynamicNodes();
|
||||||
logger.info("--> nodes found: {}", discoveryNodes);
|
logger.info("--> nodes found: {}", discoveryNodes);
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package org.elasticsearch.discovery.gce;
|
package org.elasticsearch.discovery.gce;
|
||||||
|
|
||||||
import org.elasticsearch.cloud.gce.network.GceNameResolver;
|
import org.elasticsearch.cloud.gce.network.GceNameResolver;
|
||||||
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.network.NetworkService;
|
import org.elasticsearch.common.network.NetworkService;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
@ -27,7 +28,6 @@ import org.elasticsearch.test.ESTestCase;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.arrayContaining;
|
import static org.hamcrest.Matchers.arrayContaining;
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
|
@ -81,7 +81,7 @@ public class GceNetworkTests extends ESTestCase {
|
||||||
* network.host: _local_
|
* network.host: _local_
|
||||||
*/
|
*/
|
||||||
public void networkHostCoreLocal() throws IOException {
|
public void networkHostCoreLocal() throws IOException {
|
||||||
resolveGce("_local_", new NetworkService(Settings.EMPTY, Collections.emptyList())
|
resolveGce("_local_", new NetworkService(Collections.emptyList())
|
||||||
.resolveBindHostAddresses(new String[] { NetworkService.DEFAULT_NETWORK_HOST }));
|
.resolveBindHostAddresses(new String[] { NetworkService.DEFAULT_NETWORK_HOST }));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,9 +107,10 @@ public class GceNetworkTests extends ESTestCase {
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
GceMetadataServiceMock mock = new GceMetadataServiceMock(nodeSettings);
|
GceMetadataServiceMock mock = new GceMetadataServiceMock(nodeSettings);
|
||||||
NetworkService networkService = new NetworkService(nodeSettings, Collections.singletonList(new GceNameResolver(nodeSettings, mock)));
|
NetworkService networkService = new NetworkService(Collections.singletonList(new GceNameResolver(nodeSettings, mock)));
|
||||||
try {
|
try {
|
||||||
InetAddress[] addresses = networkService.resolveBindHostAddresses(null);
|
InetAddress[] addresses = networkService.resolveBindHostAddresses(
|
||||||
|
NetworkService.GLOBAL_NETWORK_BINDHOST_SETTING.get(nodeSettings).toArray(Strings.EMPTY_ARRAY));
|
||||||
if (expected == null) {
|
if (expected == null) {
|
||||||
fail("We should get a IllegalArgumentException when setting network.host: _gce:doesnotexist_");
|
fail("We should get a IllegalArgumentException when setting network.host: _gce:doesnotexist_");
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,7 +98,6 @@ import org.elasticsearch.transport.MockTransportClient;
|
||||||
import org.elasticsearch.transport.TcpTransport;
|
import org.elasticsearch.transport.TcpTransport;
|
||||||
import org.elasticsearch.transport.Transport;
|
import org.elasticsearch.transport.Transport;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
import org.elasticsearch.transport.TransportSettings;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
|
@ -319,7 +318,7 @@ public final class InternalTestCluster extends TestCluster {
|
||||||
builder.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), baseDir.resolve("custom"));
|
builder.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), baseDir.resolve("custom"));
|
||||||
builder.put(Environment.PATH_HOME_SETTING.getKey(), baseDir);
|
builder.put(Environment.PATH_HOME_SETTING.getKey(), baseDir);
|
||||||
builder.put(Environment.PATH_REPO_SETTING.getKey(), baseDir.resolve("repos"));
|
builder.put(Environment.PATH_REPO_SETTING.getKey(), baseDir.resolve("repos"));
|
||||||
builder.put(TransportSettings.PORT.getKey(), TRANSPORT_BASE_PORT + "-" + (TRANSPORT_BASE_PORT + PORTS_PER_CLUSTER));
|
builder.put(TcpTransport.PORT.getKey(), TRANSPORT_BASE_PORT + "-" + (TRANSPORT_BASE_PORT + PORTS_PER_CLUSTER));
|
||||||
builder.put("http.port", HTTP_BASE_PORT + "-" + (HTTP_BASE_PORT + PORTS_PER_CLUSTER));
|
builder.put("http.port", HTTP_BASE_PORT + "-" + (HTTP_BASE_PORT + PORTS_PER_CLUSTER));
|
||||||
builder.put("http.pipelining", enableHttpPipelining);
|
builder.put("http.pipelining", enableHttpPipelining);
|
||||||
if (Strings.hasLength(System.getProperty("tests.es.logger.level"))) {
|
if (Strings.hasLength(System.getProperty("tests.es.logger.level"))) {
|
||||||
|
|
|
@ -29,7 +29,7 @@ import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.mocksocket.MockServerSocket;
|
import org.elasticsearch.mocksocket.MockServerSocket;
|
||||||
import org.elasticsearch.test.InternalTestCluster;
|
import org.elasticsearch.test.InternalTestCluster;
|
||||||
import org.elasticsearch.test.NodeConfigurationSource;
|
import org.elasticsearch.test.NodeConfigurationSource;
|
||||||
import org.elasticsearch.transport.TransportSettings;
|
import org.elasticsearch.transport.TcpTransport;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
@ -122,8 +122,8 @@ public class ClusterDiscoveryConfiguration extends NodeConfigurationSource {
|
||||||
throw new ElasticsearchException("nodeOrdinal [" + nodeOrdinal + "] is greater than the number unicast ports [" + unicastHostPorts.length + "]");
|
throw new ElasticsearchException("nodeOrdinal [" + nodeOrdinal + "] is greater than the number unicast ports [" + unicastHostPorts.length + "]");
|
||||||
} else {
|
} else {
|
||||||
// we need to pin the node port & host so we'd know where to point things
|
// we need to pin the node port & host so we'd know where to point things
|
||||||
builder.put(TransportSettings.PORT.getKey(), unicastHostPorts[nodeOrdinal]);
|
builder.put(TcpTransport.PORT.getKey(), unicastHostPorts[nodeOrdinal]);
|
||||||
builder.put(TransportSettings.HOST.getKey(), IP_ADDR); // only bind on one IF we use v4 here by default
|
builder.put(TcpTransport.HOST.getKey(), IP_ADDR); // only bind on one IF we use v4 here by default
|
||||||
builder.put(NetworkModule.HTTP_ENABLED.getKey(), false);
|
builder.put(NetworkModule.HTTP_ENABLED.getKey(), false);
|
||||||
for (int i = 0; i < unicastHostOrdinals.length; i++) {
|
for (int i = 0; i < unicastHostOrdinals.length; i++) {
|
||||||
unicastHosts[i] = IP_ADDR + ":" + (unicastHostPorts[unicastHostOrdinals[i]]);
|
unicastHosts[i] = IP_ADDR + ":" + (unicastHostPorts[unicastHostOrdinals[i]]);
|
||||||
|
|
|
@ -49,6 +49,7 @@ import org.elasticsearch.transport.ConnectTransportException;
|
||||||
import org.elasticsearch.transport.ConnectionProfile;
|
import org.elasticsearch.transport.ConnectionProfile;
|
||||||
import org.elasticsearch.transport.MockTcpTransport;
|
import org.elasticsearch.transport.MockTcpTransport;
|
||||||
import org.elasticsearch.transport.RequestHandlerRegistry;
|
import org.elasticsearch.transport.RequestHandlerRegistry;
|
||||||
|
import org.elasticsearch.transport.TcpTransport;
|
||||||
import org.elasticsearch.transport.Transport;
|
import org.elasticsearch.transport.Transport;
|
||||||
import org.elasticsearch.transport.TransportException;
|
import org.elasticsearch.transport.TransportException;
|
||||||
import org.elasticsearch.transport.TransportInterceptor;
|
import org.elasticsearch.transport.TransportInterceptor;
|
||||||
|
@ -100,7 +101,7 @@ public final class MockTransportService extends TransportService {
|
||||||
@Nullable ClusterSettings clusterSettings) {
|
@Nullable ClusterSettings clusterSettings) {
|
||||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
|
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
|
||||||
final Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
final Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
||||||
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(settings, Collections.emptyList()), version);
|
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version);
|
||||||
return createNewService(settings, transport, version, threadPool, clusterSettings);
|
return createNewService(settings, transport, version, threadPool, clusterSettings);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -359,7 +360,7 @@ public final class MockTransportService extends TransportService {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Replace with proper setting
|
// TODO: Replace with proper setting
|
||||||
TimeValue connectingTimeout = NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT.getDefault(Settings.EMPTY);
|
TimeValue connectingTimeout = TcpTransport.TCP_CONNECT_TIMEOUT.getDefault(Settings.EMPTY);
|
||||||
try {
|
try {
|
||||||
if (delay.millis() < connectingTimeout.millis()) {
|
if (delay.millis() < connectingTimeout.millis()) {
|
||||||
Thread.sleep(delay.millis());
|
Thread.sleep(delay.millis());
|
||||||
|
@ -381,7 +382,7 @@ public final class MockTransportService extends TransportService {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Replace with proper setting
|
// TODO: Replace with proper setting
|
||||||
TimeValue connectingTimeout = NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT.getDefault(Settings.EMPTY);
|
TimeValue connectingTimeout = TcpTransport.TCP_CONNECT_TIMEOUT.getDefault(Settings.EMPTY);
|
||||||
try {
|
try {
|
||||||
if (delay.millis() < connectingTimeout.millis()) {
|
if (delay.millis() < connectingTimeout.millis()) {
|
||||||
Thread.sleep(delay.millis());
|
Thread.sleep(delay.millis());
|
||||||
|
|
|
@ -1906,7 +1906,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||||
assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport);
|
assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport);
|
||||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
||||||
try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
||||||
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Settings.EMPTY, Collections.emptyList()),
|
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()),
|
||||||
Version.fromString("2.0.0"))) {
|
Version.fromString("2.0.0"))) {
|
||||||
transport.transportServiceAdapter(serviceA.new Adapter());
|
transport.transportServiceAdapter(serviceA.new Adapter());
|
||||||
transport.start();
|
transport.start();
|
||||||
|
@ -1928,7 +1928,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
||||||
Version version = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT);
|
Version version = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT);
|
||||||
try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
||||||
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Settings.EMPTY, Collections.emptyList()),version)) {
|
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()),version)) {
|
||||||
transport.transportServiceAdapter(serviceA.new Adapter());
|
transport.transportServiceAdapter(serviceA.new Adapter());
|
||||||
transport.start();
|
transport.start();
|
||||||
DiscoveryNode node =
|
DiscoveryNode node =
|
||||||
|
@ -1954,7 +1954,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
||||||
|
|
||||||
try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
||||||
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Settings.EMPTY, Collections.emptyList())) {
|
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList())) {
|
||||||
@Override
|
@Override
|
||||||
protected String handleRequest(MockChannel mockChannel, String profileName, StreamInput stream, long requestId,
|
protected String handleRequest(MockChannel mockChannel, String profileName, StreamInput stream, long requestId,
|
||||||
int messageLengthBytes, Version version, InetSocketAddress remoteAddress, byte status)
|
int messageLengthBytes, Version version, InetSocketAddress remoteAddress, byte status)
|
||||||
|
|
|
@ -390,9 +390,9 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
|
||||||
if (NetworkService.NETWORK_SERVER.get(settings)) {
|
if (NetworkService.NETWORK_SERVER.get(settings)) {
|
||||||
// loop through all profiles and start them up, special handling for default one
|
// loop through all profiles and start them up, special handling for default one
|
||||||
for (Map.Entry<String, Settings> entry : buildProfileSettings().entrySet()) {
|
for (Map.Entry<String, Settings> entry : buildProfileSettings().entrySet()) {
|
||||||
final Settings settings = Settings.builder()
|
final Settings profileSettings = Settings.builder()
|
||||||
.put(entry.getValue()).build();
|
.put(entry.getValue()).build();
|
||||||
bindServer(entry.getKey(), settings);
|
bindServer(entry.getKey(), profileSettings);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
super.doStart();
|
super.doStart();
|
||||||
|
|
|
@ -35,7 +35,6 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.ConnectionProfile;
|
import org.elasticsearch.transport.ConnectionProfile;
|
||||||
import org.elasticsearch.transport.TcpTransport;
|
import org.elasticsearch.transport.TcpTransport;
|
||||||
import org.elasticsearch.transport.TransportSettings;
|
|
||||||
import org.elasticsearch.transport.Transports;
|
import org.elasticsearch.transport.Transports;
|
||||||
import org.elasticsearch.transport.nio.channel.ChannelFactory;
|
import org.elasticsearch.transport.nio.channel.ChannelFactory;
|
||||||
import org.elasticsearch.transport.nio.channel.NioChannel;
|
import org.elasticsearch.transport.nio.channel.NioChannel;
|
||||||
|
@ -178,11 +177,11 @@ public class NioTransport extends TcpTransport<NioChannel> {
|
||||||
// loop through all profiles and start them up, special handling for default one
|
// loop through all profiles and start them up, special handling for default one
|
||||||
for (Map.Entry<String, Settings> entry : buildProfileSettings().entrySet()) {
|
for (Map.Entry<String, Settings> entry : buildProfileSettings().entrySet()) {
|
||||||
// merge fallback settings with default settings with profile settings so we have complete settings with default values
|
// merge fallback settings with default settings with profile settings so we have complete settings with default values
|
||||||
final Settings settings = Settings.builder()
|
final Settings profileSettings = Settings.builder()
|
||||||
.put(createFallbackSettings())
|
.put(createFallbackSettings())
|
||||||
.put(entry.getValue()).build();
|
.put(entry.getValue()).build();
|
||||||
profileToChannelFactory.putIfAbsent(entry.getKey(), new ChannelFactory(settings, tcpReadHandler));
|
profileToChannelFactory.putIfAbsent(entry.getKey(), new ChannelFactory(profileSettings, tcpReadHandler));
|
||||||
bindServer(entry.getKey(), settings);
|
bindServer(entry.getKey(), profileSettings);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
client = createClient();
|
client = createClient();
|
||||||
|
@ -236,36 +235,31 @@ public class NioTransport extends TcpTransport<NioChannel> {
|
||||||
private Settings createFallbackSettings() {
|
private Settings createFallbackSettings() {
|
||||||
Settings.Builder fallbackSettingsBuilder = Settings.builder();
|
Settings.Builder fallbackSettingsBuilder = Settings.builder();
|
||||||
|
|
||||||
List<String> fallbackBindHost = TransportSettings.BIND_HOST.get(settings);
|
List<String> fallbackBindHost = TcpTransport.BIND_HOST.get(settings);
|
||||||
if (fallbackBindHost.isEmpty() == false) {
|
if (fallbackBindHost.isEmpty() == false) {
|
||||||
fallbackSettingsBuilder.putArray("bind_host", fallbackBindHost);
|
fallbackSettingsBuilder.putArray("bind_host", fallbackBindHost);
|
||||||
}
|
}
|
||||||
|
|
||||||
List<String> fallbackPublishHost = TransportSettings.PUBLISH_HOST.get(settings);
|
List<String> fallbackPublishHost = TcpTransport.PUBLISH_HOST.get(settings);
|
||||||
if (fallbackPublishHost.isEmpty() == false) {
|
if (fallbackPublishHost.isEmpty() == false) {
|
||||||
fallbackSettingsBuilder.putArray("publish_host", fallbackPublishHost);
|
fallbackSettingsBuilder.putArray("publish_host", fallbackPublishHost);
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean fallbackTcpNoDelay = settings.getAsBoolean("transport.nio.tcp_no_delay",
|
boolean fallbackTcpNoDelay = TcpTransport.TCP_NO_DELAY.get(settings);
|
||||||
NetworkService.TcpSettings.TCP_NO_DELAY.get(settings));
|
|
||||||
fallbackSettingsBuilder.put("tcp_no_delay", fallbackTcpNoDelay);
|
fallbackSettingsBuilder.put("tcp_no_delay", fallbackTcpNoDelay);
|
||||||
|
|
||||||
boolean fallbackTcpKeepAlive = settings.getAsBoolean("transport.nio.tcp_keep_alive",
|
boolean fallbackTcpKeepAlive = TcpTransport.TCP_KEEP_ALIVE.get(settings);
|
||||||
NetworkService.TcpSettings.TCP_KEEP_ALIVE.get(settings));
|
|
||||||
fallbackSettingsBuilder.put("tcp_keep_alive", fallbackTcpKeepAlive);
|
fallbackSettingsBuilder.put("tcp_keep_alive", fallbackTcpKeepAlive);
|
||||||
|
|
||||||
boolean fallbackReuseAddress = settings.getAsBoolean("transport.nio.reuse_address",
|
boolean fallbackReuseAddress = TcpTransport.TCP_REUSE_ADDRESS.get(settings);;
|
||||||
NetworkService.TcpSettings.TCP_REUSE_ADDRESS.get(settings));
|
|
||||||
fallbackSettingsBuilder.put("reuse_address", fallbackReuseAddress);
|
fallbackSettingsBuilder.put("reuse_address", fallbackReuseAddress);
|
||||||
|
|
||||||
ByteSizeValue fallbackTcpSendBufferSize = settings.getAsBytesSize("transport.nio.tcp_send_buffer_size",
|
ByteSizeValue fallbackTcpSendBufferSize = TcpTransport.TCP_SEND_BUFFER_SIZE.get(settings);
|
||||||
TCP_SEND_BUFFER_SIZE.get(settings));
|
|
||||||
if (fallbackTcpSendBufferSize.getBytes() >= 0) {
|
if (fallbackTcpSendBufferSize.getBytes() >= 0) {
|
||||||
fallbackSettingsBuilder.put("tcp_send_buffer_size", fallbackTcpSendBufferSize);
|
fallbackSettingsBuilder.put("tcp_send_buffer_size", fallbackTcpSendBufferSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
ByteSizeValue fallbackTcpBufferSize = settings.getAsBytesSize("transport.nio.tcp_receive_buffer_size",
|
ByteSizeValue fallbackTcpBufferSize = TcpTransport.TCP_RECEIVE_BUFFER_SIZE.get(settings);;
|
||||||
TCP_RECEIVE_BUFFER_SIZE.get(settings));
|
|
||||||
if (fallbackTcpBufferSize.getBytes() >= 0) {
|
if (fallbackTcpBufferSize.getBytes() >= 0) {
|
||||||
fallbackSettingsBuilder.put("tcp_receive_buffer_size", fallbackTcpBufferSize);
|
fallbackSettingsBuilder.put("tcp_receive_buffer_size", fallbackTcpBufferSize);
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,7 +36,7 @@ import org.elasticsearch.test.InternalTestCluster;
|
||||||
import org.elasticsearch.test.NodeConfigurationSource;
|
import org.elasticsearch.test.NodeConfigurationSource;
|
||||||
import org.elasticsearch.test.discovery.TestZenDiscovery;
|
import org.elasticsearch.test.discovery.TestZenDiscovery;
|
||||||
import org.elasticsearch.transport.MockTcpTransportPlugin;
|
import org.elasticsearch.transport.MockTcpTransportPlugin;
|
||||||
import org.elasticsearch.transport.TransportSettings;
|
import org.elasticsearch.transport.TcpTransport;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
|
@ -102,7 +102,7 @@ public class InternalTestClusterTests extends ESTestCase {
|
||||||
|
|
||||||
static {
|
static {
|
||||||
clusterUniqueSettings.add(ClusterName.CLUSTER_NAME_SETTING.getKey());
|
clusterUniqueSettings.add(ClusterName.CLUSTER_NAME_SETTING.getKey());
|
||||||
clusterUniqueSettings.add(TransportSettings.PORT.getKey());
|
clusterUniqueSettings.add(TcpTransport.PORT.getKey());
|
||||||
clusterUniqueSettings.add("http.port");
|
clusterUniqueSettings.add("http.port");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,7 +37,7 @@ public class MockTcpTransportTests extends AbstractSimpleTransportTestCase {
|
||||||
protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) {
|
protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) {
|
||||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
||||||
Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
||||||
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(settings, Collections.emptyList()), version) {
|
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version) {
|
||||||
@Override
|
@Override
|
||||||
protected Version executeHandshake(DiscoveryNode node, MockChannel mockChannel, TimeValue timeout) throws IOException,
|
protected Version executeHandshake(DiscoveryNode node, MockChannel mockChannel, TimeValue timeout) throws IOException,
|
||||||
InterruptedException {
|
InterruptedException {
|
||||||
|
|
|
@ -35,9 +35,9 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
|
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
|
||||||
import org.elasticsearch.transport.BindTransportException;
|
import org.elasticsearch.transport.BindTransportException;
|
||||||
import org.elasticsearch.transport.ConnectTransportException;
|
import org.elasticsearch.transport.ConnectTransportException;
|
||||||
|
import org.elasticsearch.transport.TcpTransport;
|
||||||
import org.elasticsearch.transport.Transport;
|
import org.elasticsearch.transport.Transport;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
import org.elasticsearch.transport.TransportSettings;
|
|
||||||
import org.elasticsearch.transport.nio.channel.NioChannel;
|
import org.elasticsearch.transport.nio.channel.NioChannel;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -55,7 +55,7 @@ public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase {
|
||||||
public static MockTransportService nioFromThreadPool(Settings settings, ThreadPool threadPool, final Version version,
|
public static MockTransportService nioFromThreadPool(Settings settings, ThreadPool threadPool, final Version version,
|
||||||
ClusterSettings clusterSettings, boolean doHandshake) {
|
ClusterSettings clusterSettings, boolean doHandshake) {
|
||||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
||||||
NetworkService networkService = new NetworkService(settings, Collections.emptyList());
|
NetworkService networkService = new NetworkService(Collections.emptyList());
|
||||||
Transport transport = new NioTransport(settings, threadPool,
|
Transport transport = new NioTransport(settings, threadPool,
|
||||||
networkService,
|
networkService,
|
||||||
BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {
|
BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {
|
||||||
|
@ -88,7 +88,7 @@ public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) {
|
protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) {
|
||||||
settings = Settings.builder().put(settings).put(TransportSettings.PORT.getKey(), "0").build();
|
settings = Settings.builder().put(settings).put(TcpTransport.PORT.getKey(), "0").build();
|
||||||
MockTransportService transportService = nioFromThreadPool(settings, threadPool, version, clusterSettings, doHandshake);
|
MockTransportService transportService = nioFromThreadPool(settings, threadPool, version, clusterSettings, doHandshake);
|
||||||
transportService.start();
|
transportService.start();
|
||||||
return transportService;
|
return transportService;
|
||||||
|
|
Loading…
Reference in New Issue