Netty: Support to bind on multiple host/port pairs

This patch allows to create several netty bootstrap, each of which
listening on different ports. This will potentially allow for features
to listen to different network interfaces for node-to-node or node-to-client
communication and is also the base to listen to several interfaces, so that those
can be used to speed up cluster communication in the future.

Closes #8098
This commit is contained in:
Alexander Reelsen 2014-10-12 18:30:12 +02:00
parent 5d53d03c2d
commit b06b52449f
6 changed files with 575 additions and 110 deletions

View File

@ -48,6 +48,33 @@ between all nodes. Defaults to `false`.
It also uses the common
<<modules-network,network settings>>.
==== TCP Transport Profiles
Elasticsearch allows you to bind to multiple ports on different interfaces by the use of transport profiles. See this example configuration
```
transport.profiles.default.port: 9300-9400
transport.profiles.default.bind_host: 10.0.0.1
transport.profiles.client.port: 9500-9600
transport.profiles.client.bind_host: 192.168.0.1
transport.profiles.dmz.port: 9700-9800
transport.profiles.dmz.bind_host: 172.16.1.2
```
The `default` profile is a special. It is used as fallback for any other profiles, if those do not have a specific configuration setting set.
Note that the default profile is how other nodes in the cluster will connect to this node usually. In the future this feature will allow to enable node-to-node communication via multiple interfaces.
The following parameters can be configured like that
* `port`: The port to bind to
* `bind_host`: The host to bind
* `publish_host`: The host which is published in informational APIs
* `tcp_no_delay`: Configures the `TCP_NO_DELAY` option for this socket
* `tcp_keep_alive`: Configures the `SO_KEEPALIVE` option for this socket
* `reuse_address`: Configures the `SO_REUSEADDR` option for this socket
* `tcp_send_buffer_size`: Configures the send buffer size of the socket
* `tcp_receive_buffer_size`: Configures the receive buffer size of the socket
[float]
=== Local Transport

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.util.concurrent;
import com.google.common.base.Joiner;
import org.elasticsearch.common.settings.Settings;
import java.util.concurrent.*;
@ -72,6 +73,10 @@ public class EsExecutors {
return new EsThreadPoolExecutor(size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy());
}
public static String threadName(Settings settings, String ... names) {
return threadName(settings, "[" + Joiner.on(".").skipNulls().join(names) + "]");
}
public static String threadName(Settings settings, String namePrefix) {
String name = settings.get("name");
if (name == null) {
@ -86,6 +91,10 @@ public class EsExecutors {
return daemonThreadFactory(threadName(settings, namePrefix));
}
public static ThreadFactory daemonThreadFactory(Settings settings, String ... names) {
return daemonThreadFactory(threadName(settings, names));
}
public static ThreadFactory daemonThreadFactory(String namePrefix) {
return new EsThreadFactory(namePrefix);
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.transport.netty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.elasticsearch.*;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
@ -39,6 +40,7 @@ import org.elasticsearch.common.netty.OpenChannelsHandler;
import org.elasticsearch.common.netty.ReleaseChannelFutureListener;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
@ -70,20 +72,15 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.CancelledKeyException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.*;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.common.transport.NetworkExceptionHelper.isCloseConnectionException;
import static org.elasticsearch.common.transport.NetworkExceptionHelper.isConnectException;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
@ -107,17 +104,33 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
public static final String CONNECTIONS_PER_NODE_REG = "transport.connections_per_node.reg";
public static final String CONNECTIONS_PER_NODE_STATE = "transport.connections_per_node.state";
public static final String CONNECTIONS_PER_NODE_PING = "transport.connections_per_node.ping";
private static final String DEFAULT_PORT_RANGE = "9300-9400";
private final NetworkService networkService;
final Version version;
private final boolean blockingClient;
private final TimeValue connectTimeout;
private final ByteSizeValue maxCumulationBufferCapacity;
private final int maxCompositeBufferComponents;
final boolean compress;
private final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory;
private final int workerCount;
private final ByteSizeValue receivePredictorMin;
private final ByteSizeValue receivePredictorMax;
final int connectionsPerNodeRecovery;
final int connectionsPerNodeBulk;
final int connectionsPerNodeReg;
final int connectionsPerNodeState;
final int connectionsPerNodePing;
/*
final int workerCount;
final int bossCount;
final boolean blockingServer;
final boolean blockingClient;
final String port;
final String bindHost;
@ -135,7 +148,6 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
final ByteSizeValue tcpSendBufferSize;
final ByteSizeValue tcpReceiveBufferSize;
final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory;
final int connectionsPerNodeRecovery;
final int connectionsPerNodeBulk;
@ -145,27 +157,18 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
final ByteSizeValue maxCumulationBufferCapacity;
final int maxCompositeBufferComponents;
*/
final BigArrays bigArrays;
private final ThreadPool threadPool;
private volatile OpenChannelsHandler serverOpenChannels;
private volatile ClientBootstrap clientBootstrap;
private volatile ServerBootstrap serverBootstrap;
// node id to actual channel
final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap();
private volatile Channel serverChannel;
private final Map<String, ServerBootstrap> serverBootstraps = newConcurrentMap();
private final Map<String, Channel> serverChannels = newConcurrentMap();
private volatile TransportServiceAdapter transportServiceAdapter;
private volatile BoundTransportAddress boundAddress;
private final KeyedLock<String> connectionLock = new KeyedLock<>();
// this lock is here to make sure we close this transport and disconnect all the client nodes
@ -185,20 +188,12 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
this.workerCount = settings.getAsInt(WORKER_COUNT, EsExecutors.boundedNumberOfProcessors(settings) * 2);
this.bossCount = componentSettings.getAsInt("boss_count", 1);
this.blockingServer = settings.getAsBoolean("transport.tcp.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false)));
this.blockingClient = settings.getAsBoolean("transport.tcp.blocking_client", settings.getAsBoolean(TCP_BLOCKING_CLIENT, settings.getAsBoolean(TCP_BLOCKING, false)));
this.port = componentSettings.get("port", settings.get("transport.tcp.port", "9300-9400"));
this.bindHost = componentSettings.get("bind_host", settings.get("transport.bind_host", settings.get("transport.host")));
this.publishHost = componentSettings.get("publish_host", settings.get("transport.publish_host", settings.get("transport.host")));
this.publishPort = componentSettings.getAsInt("publish_port", settings.getAsInt("transport.publish_port", 0));
this.compress = settings.getAsBoolean(TransportSettings.TRANSPORT_TCP_COMPRESS, false);
this.connectTimeout = componentSettings.getAsTime("connect_timeout", settings.getAsTime("transport.tcp.connect_timeout", settings.getAsTime(TCP_CONNECT_TIMEOUT, TCP_DEFAULT_CONNECT_TIMEOUT)));
this.tcpNoDelay = componentSettings.get("tcp_no_delay", settings.get(TCP_NO_DELAY, "true"));
this.tcpKeepAlive = componentSettings.get("tcp_keep_alive", settings.get(TCP_KEEP_ALIVE, "true"));
this.reuseAddress = componentSettings.getAsBoolean("reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress()));
this.tcpSendBufferSize = componentSettings.getAsBytesSize("tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE));
this.tcpReceiveBufferSize = componentSettings.getAsBytesSize("tcp_receive_buffer_size", settings.getAsBytesSize(TCP_RECEIVE_BUFFER_SIZE, TCP_DEFAULT_RECEIVE_BUFFER_SIZE));
this.maxCumulationBufferCapacity = componentSettings.getAsBytesSize("max_cumulation_buffer_capacity", null);
this.maxCompositeBufferComponents = componentSettings.getAsInt("max_composite_buffer_components", -1);
this.compress = settings.getAsBoolean(TransportSettings.TRANSPORT_TCP_COMPRESS, false);
this.connectionsPerNodeRecovery = componentSettings.getAsInt("connections_per_node.recovery", settings.getAsInt(CONNECTIONS_PER_NODE_RECOVERY, 2));
this.connectionsPerNodeBulk = componentSettings.getAsInt("connections_per_node.bulk", settings.getAsInt(CONNECTIONS_PER_NODE_BULK, 3));
this.connectionsPerNodeReg = componentSettings.getAsInt("connections_per_node.reg", settings.getAsInt(CONNECTIONS_PER_NODE_REG, 6));
@ -216,9 +211,6 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
throw new ElasticsearchIllegalArgumentException("can't set [connection_per_node.state] to 0");
}
this.maxCumulationBufferCapacity = componentSettings.getAsBytesSize("max_cumulation_buffer_capacity", null);
this.maxCompositeBufferComponents = componentSettings.getAsInt("max_composite_buffer_components", -1);
long defaultReceiverPredictor = 512 * 1024;
if (JvmInfo.jvmInfo().mem().directMemoryMax().bytes() > 0) {
// we can guess a better default...
@ -227,16 +219,13 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
// See AdaptiveReceiveBufferSizePredictor#DEFAULT_XXX for default values in netty..., we can use higher ones for us, even fixed one
ByteSizeValue receivePredictorMin = componentSettings.getAsBytesSize("receive_predictor_min", componentSettings.getAsBytesSize("receive_predictor_size", new ByteSizeValue(defaultReceiverPredictor)));
ByteSizeValue receivePredictorMax = componentSettings.getAsBytesSize("receive_predictor_max", componentSettings.getAsBytesSize("receive_predictor_size", new ByteSizeValue(defaultReceiverPredictor)));
this.receivePredictorMin = componentSettings.getAsBytesSize("receive_predictor_min", componentSettings.getAsBytesSize("receive_predictor_size", new ByteSizeValue(defaultReceiverPredictor)));
this.receivePredictorMax = componentSettings.getAsBytesSize("receive_predictor_max", componentSettings.getAsBytesSize("receive_predictor_size", new ByteSizeValue(defaultReceiverPredictor)));
if (receivePredictorMax.bytes() == receivePredictorMin.bytes()) {
receiveBufferSizePredictorFactory = new FixedReceiveBufferSizePredictorFactory((int) receivePredictorMax.bytes());
} else {
receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes());
}
logger.debug("using worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]",
workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeRecovery, connectionsPerNodeBulk, connectionsPerNodeReg, connectionsPerNodeState, connectionsPerNodePing, receivePredictorMin, receivePredictorMax);
}
public Settings settings() {
@ -258,9 +247,75 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
@Override
protected void doStart() throws ElasticsearchException {
clientBootstrap = createClientBootstrap();
if (!settings.getAsBoolean("network.server", true)) {
return;
}
final OpenChannelsHandler openChannels = new OpenChannelsHandler(logger);
this.serverOpenChannels = openChannels;
// extract default profile first and create standard bootstrap
Map<String, Settings> profiles = settings.getGroups("transport.profiles", true);
if (!profiles.containsKey("default")) {
profiles = Maps.newHashMap(profiles);
profiles.put("default", ImmutableSettings.EMPTY);
}
Settings fallbackSettings = createFallbackSettings();
Settings defaultSettings = profiles.get("default");
// loop through all profiles and strart them app, special handling for default one
for (Map.Entry<String, Settings> entry : profiles.entrySet()) {
Settings profileSettings = entry.getValue();
String name = entry.getKey();
if ("default".equals(name)) {
profileSettings = settingsBuilder()
.put(profileSettings)
.put("port", profileSettings.get("port", componentSettings.get("port", this.settings.get("transport.tcp.port", DEFAULT_PORT_RANGE))))
.build();
} else {
// if profile does not have a port, skip it
if (profileSettings.get("port") == null) {
logger.info("No port configured for profile [{}], not binding", name);
continue;
}
}
// merge fallback settings with default settings with profile settings so we have complete settings with default values
Settings mergedSettings = settingsBuilder()
.put(fallbackSettings)
.put(defaultSettings)
.put(profileSettings)
.build();
createServerBootstrap(name, mergedSettings);
bindServerBootstrap(name, mergedSettings);
}
InetSocketAddress boundAddress = (InetSocketAddress) serverChannels.get("default").getLocalAddress();
InetSocketAddress publishAddress;
int publishPort = componentSettings.getAsInt("publish_port", settings.getAsInt("transport.publish_port", 0));
if (0 == publishPort) {
publishPort = boundAddress.getPort();
}
try {
String publishHost = componentSettings.get("publish_host", settings.get("transport.publish_host", settings.get("transport.host")));
publishAddress = new InetSocketAddress(networkService.resolvePublishHostAddress(publishHost), publishPort);
} catch (Exception e) {
throw new BindTransportException("Failed to resolve publish address", e);
}
this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress));
}
private ClientBootstrap createClientBootstrap() {
if (blockingClient) {
clientBootstrap = new ClientBootstrap(new OioClientSocketChannelFactory(Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_client_worker"))));
} else {
int bossCount = componentSettings.getAsInt("boss_count", 1);
clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_client_boss")),
bossCount,
@ -269,41 +324,138 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
clientBootstrap.setPipelineFactory(configureClientChannelPipelineFactory());
clientBootstrap.setOption("connectTimeoutMillis", connectTimeout.millis());
String tcpNoDelay = componentSettings.get("tcp_no_delay", settings.get(TCP_NO_DELAY, "true"));
if (!"default".equals(tcpNoDelay)) {
clientBootstrap.setOption("tcpNoDelay", Booleans.parseBoolean(tcpNoDelay, null));
}
String tcpKeepAlive = componentSettings.get("tcp_keep_alive", settings.get(TCP_KEEP_ALIVE, "true"));
if (!"default".equals(tcpKeepAlive)) {
clientBootstrap.setOption("keepAlive", Booleans.parseBoolean(tcpKeepAlive, null));
}
ByteSizeValue tcpSendBufferSize = componentSettings.getAsBytesSize("tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE));
if (tcpSendBufferSize != null && tcpSendBufferSize.bytes() > 0) {
clientBootstrap.setOption("sendBufferSize", tcpSendBufferSize.bytes());
}
ByteSizeValue tcpReceiveBufferSize = componentSettings.getAsBytesSize("tcp_receive_buffer_size", settings.getAsBytesSize(TCP_RECEIVE_BUFFER_SIZE, TCP_DEFAULT_RECEIVE_BUFFER_SIZE));
if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.bytes() > 0) {
clientBootstrap.setOption("receiveBufferSize", tcpReceiveBufferSize.bytes());
}
clientBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
Boolean reuseAddress = componentSettings.getAsBoolean("reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress()));
if (reuseAddress != null) {
clientBootstrap.setOption("reuseAddress", reuseAddress);
}
if (!settings.getAsBoolean("network.server", true)) {
return;
return clientBootstrap;
}
private Settings createFallbackSettings() {
ImmutableSettings.Builder fallbackSettingsBuilder = settingsBuilder();
String fallbackBindHost = componentSettings.get("bind_host", settings.get("transport.bind_host", settings.get("transport.host")));
if (fallbackBindHost != null) {
fallbackSettingsBuilder.put("bind_host", fallbackBindHost);
}
final OpenChannelsHandler openChannels = new OpenChannelsHandler(logger);
this.serverOpenChannels = openChannels;
String fallbackPublishHost = componentSettings.get("publish_host", settings.get("transport.publish_host", settings.get("transport.host")));
if (fallbackPublishHost != null) {
fallbackSettingsBuilder.put("publish_host", fallbackPublishHost);
}
String fallbackTcpNoDelay = componentSettings.get("tcp_no_delay", settings.get(TCP_NO_DELAY, "true"));
if (fallbackTcpNoDelay != null) {
fallbackSettingsBuilder.put("tcp_no_delay", fallbackTcpNoDelay);
}
String fallbackTcpKeepAlive = componentSettings.get("tcp_keep_alive", settings.get(TCP_KEEP_ALIVE, "true"));
if (fallbackTcpKeepAlive != null) {
fallbackSettingsBuilder.put("tcp_keep_alive", fallbackTcpKeepAlive);
}
Boolean fallbackReuseAddress = componentSettings.getAsBoolean("reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress()));
if (fallbackReuseAddress != null) {
fallbackSettingsBuilder.put("reuse_address", fallbackReuseAddress);
}
ByteSizeValue fallbackTcpSendBufferSize = componentSettings.getAsBytesSize("tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE));
if (fallbackTcpSendBufferSize != null) {
fallbackSettingsBuilder.put("tcp_send_buffer_size", fallbackTcpSendBufferSize);
}
ByteSizeValue fallbackTcpBufferSize = componentSettings.getAsBytesSize("tcp_receive_buffer_size", settings.getAsBytesSize(TCP_RECEIVE_BUFFER_SIZE, TCP_DEFAULT_RECEIVE_BUFFER_SIZE));
if (fallbackTcpBufferSize != null) {
fallbackSettingsBuilder.put("tcp_receive_buffer_size", fallbackTcpBufferSize);
}
return fallbackSettingsBuilder.build();
}
private void bindServerBootstrap(final String name, final Settings settings) {
// Bind and start to accept incoming connections.
InetAddress hostAddressX;
String bindHost = settings.get("bind_host");
try {
hostAddressX = networkService.resolveBindHostAddress(bindHost);
} catch (IOException e) {
throw new BindTransportException("Failed to resolve host [" + bindHost + "]", e);
}
final InetAddress hostAddress = hostAddressX;
String port = settings.get("port");
PortsRange portsRange = new PortsRange(port);
final AtomicReference<Exception> lastException = new AtomicReference<>();
boolean success = portsRange.iterate(new PortsRange.PortCallback() {
@Override
public boolean onPortNumber(int portNumber) {
try {
serverChannels.put(name, serverBootstraps.get(name).bind(new InetSocketAddress(hostAddress, portNumber)));
} catch (Exception e) {
lastException.set(e);
return false;
}
return true;
}
});
if (!success) {
throw new BindTransportException("Failed to bind to [" + port + "]", lastException.get());
}
logger.debug("Bound profile [{}] to address [{}]", name, serverChannels.get(name).getLocalAddress());
}
private void createServerBootstrap(String name, Settings settings) {
boolean blockingServer = settings.getAsBoolean("transport.tcp.blocking_server", this.settings.getAsBoolean(TCP_BLOCKING_SERVER, this.settings.getAsBoolean(TCP_BLOCKING, false)));
String port = settings.get("port");
String bindHost = settings.get("bind_host");
String publishHost = settings.get("publish_host");
String tcpNoDelay = settings.get("tcp_no_delay");
String tcpKeepAlive = settings.get("tcp_keep_alive");
Boolean reuseAddress = settings.getAsBoolean("reuse_address", NetworkUtils.defaultReuseAddress());
ByteSizeValue tcpSendBufferSize = settings.getAsBytesSize("tcp_send_buffer_size", TCP_DEFAULT_SEND_BUFFER_SIZE);
ByteSizeValue tcpReceiveBufferSize = settings.getAsBytesSize("tcp_receive_buffer_size", TCP_DEFAULT_RECEIVE_BUFFER_SIZE);
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]",
name, workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeRecovery, connectionsPerNodeBulk, connectionsPerNodeReg, connectionsPerNodeState, connectionsPerNodePing, receivePredictorMin, receivePredictorMax);
ServerBootstrap serverBootstrap;
if (blockingServer) {
serverBootstrap = new ServerBootstrap(new OioServerSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_boss")),
Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_worker"))
Executors.newCachedThreadPool(daemonThreadFactory(this.settings, "transport_server_boss", name)),
Executors.newCachedThreadPool(daemonThreadFactory(this.settings, "transport_server_worker", name))
));
} else {
serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_boss")),
Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_worker")),
Executors.newCachedThreadPool(daemonThreadFactory(this.settings, "transport_server_boss", name)),
Executors.newCachedThreadPool(daemonThreadFactory(this.settings, "transport_server_worker", name)),
workerCount));
}
serverBootstrap.setPipelineFactory(configureServerChannelPipelineFactory());
serverBootstrap.setPipelineFactory(configureServerChannelPipelineFactory(name, settings));
if (!"default".equals(tcpNoDelay)) {
serverBootstrap.setOption("child.tcpNoDelay", Booleans.parseBoolean(tcpNoDelay, null));
}
@ -323,47 +475,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
serverBootstrap.setOption("child.reuseAddress", reuseAddress);
}
// Bind and start to accept incoming connections.
InetAddress hostAddressX;
try {
hostAddressX = networkService.resolveBindHostAddress(bindHost);
} catch (IOException e) {
throw new BindTransportException("Failed to resolve host [" + bindHost + "]", e);
}
final InetAddress hostAddress = hostAddressX;
PortsRange portsRange = new PortsRange(port);
final AtomicReference<Exception> lastException = new AtomicReference<>();
boolean success = portsRange.iterate(new PortsRange.PortCallback() {
@Override
public boolean onPortNumber(int portNumber) {
try {
serverChannel = serverBootstrap.bind(new InetSocketAddress(hostAddress, portNumber));
} catch (Exception e) {
lastException.set(e);
return false;
}
return true;
}
});
if (!success) {
throw new BindTransportException("Failed to bind to [" + port + "]", lastException.get());
}
logger.debug("Bound to address [{}]", serverChannel.getLocalAddress());
InetSocketAddress boundAddress = (InetSocketAddress) serverChannel.getLocalAddress();
InetSocketAddress publishAddress;
int publishPort = this.publishPort;
if (0 == publishPort) {
publishPort = boundAddress.getPort();
}
try {
publishAddress = new InetSocketAddress(networkService.resolvePublishHostAddress(publishHost), publishPort);
} catch (Exception e) {
throw new BindTransportException("Failed to resolve publish address", e);
}
this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress));
serverBootstraps.put(name, serverBootstrap);
}
@Override
@ -381,12 +493,17 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
nodeChannels.close();
}
if (serverChannel != null) {
Iterator<Map.Entry<String, Channel>> serverChannelIterator = serverChannels.entrySet().iterator();
while (serverChannelIterator.hasNext()) {
Map.Entry<String, Channel> serverChannelEntry = serverChannelIterator.next();
String name = serverChannelEntry.getKey();
Channel serverChannel = serverChannelEntry.getValue();
try {
serverChannel.close().awaitUninterruptibly();
} finally {
serverChannel = null;
} catch (Throwable t) {
logger.debug("Error closing serverChannel for profile [{}]", t, name);
}
serverChannelIterator.remove();
}
if (serverOpenChannels != null) {
@ -394,9 +511,19 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
serverOpenChannels = null;
}
if (serverBootstrap != null) {
serverBootstrap.releaseExternalResources();
serverBootstrap = null;
Iterator<Map.Entry<String, ServerBootstrap>> serverBootstrapIterator = serverBootstraps.entrySet().iterator();
while (serverBootstrapIterator.hasNext()) {
Map.Entry<String, ServerBootstrap> serverBootstrapEntry = serverBootstrapIterator.next();
String name = serverBootstrapEntry.getKey();
ServerBootstrap serverBootstrap = serverBootstrapEntry.getValue();
try {
serverBootstrap.releaseExternalResources();
} catch (Throwable t) {
logger.debug("Error closing serverBootstrap for profile [{}]", t, name);
}
serverBootstrapIterator.remove();
}
for (Iterator<NodeChannels> it = connectedNodes.values().iterator(); it.hasNext(); ) {
@ -445,7 +572,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
index = address.lastIndexOf(':');
if (index == -1) {
List<TransportAddress> addresses = Lists.newArrayList();
int[] iPorts = new PortsRange(this.port).ports();
String defaultPort = settings.get("transport.profiles.default.port", componentSettings.get("port", this.settings.get("transport.tcp.port", DEFAULT_PORT_RANGE)));
int[] iPorts = new PortsRange(defaultPort).ports();
for (int iPort : iPorts) {
addresses.add(new InetSocketTransportAddress(address, iPort));
}
@ -851,16 +979,20 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
}
public ChannelPipelineFactory configureServerChannelPipelineFactory() {
return new ServerChannelPipeFactory(this);
public ChannelPipelineFactory configureServerChannelPipelineFactory(String name, Settings settings) {
return new ServerChannelPipelineFactory(this, name, settings);
}
protected static class ServerChannelPipeFactory implements ChannelPipelineFactory {
protected static class ServerChannelPipelineFactory implements ChannelPipelineFactory {
protected NettyTransport nettyTransport;
protected final NettyTransport nettyTransport;
protected final String name;
protected final Settings settings;
public ServerChannelPipeFactory(NettyTransport nettyTransport) {
public ServerChannelPipelineFactory(NettyTransport nettyTransport, String name, Settings settings) {
this.nettyTransport = nettyTransport;
this.name = name;
this.settings = settings;
}
@Override

View File

@ -0,0 +1,70 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.transport;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.junit.annotations.Network;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;
import java.util.Locale;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.hamcrest.Matchers.is;
/**
*
*/
@ClusterScope(scope = Scope.SUITE, numDataNodes = 1)
public class NettyTransportMultiPortIntegrationTests extends ElasticsearchIntegrationTest {
private final int randomPort = randomIntBetween(1025, 65000);
private final String randomPortRange = String.format(Locale.ROOT, "%s-%s", randomPort, randomPort+100);
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return settingsBuilder()
.put(super.nodeSettings(nodeOrdinal))
.put("network.host", "127.0.0.1")
.put("node.mode", "network")
.put("transport.profiles.client1.port", randomPortRange)
.put("transport.profiles.client1.reuse_address", true)
.build();
}
@Test
@Network
@TestLogging("transport.netty:DEBUG")
public void testThatTransportClientCanConnect() throws Exception {
Settings settings = settingsBuilder().put("cluster.name", internalCluster().getClusterName()).build();
try (TransportClient transportClient = new TransportClient(settings)) {
transportClient.addTransportAddress(new InetSocketTransportAddress("localhost", randomPort));
ClusterHealthResponse response = transportClient.admin().cluster().prepareHealth().get();
assertThat(response.getStatus(), is(ClusterHealthStatus.GREEN));
}
}
}

View File

@ -0,0 +1,227 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.transport;
import com.carrotsearch.hppc.IntOpenHashSet;
import com.google.common.base.Charsets;
import com.google.common.collect.Sets;
import org.elasticsearch.Version;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.cluster.metadata.BenchmarkMetaData;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.test.cache.recycler.MockBigArrays;
import org.elasticsearch.test.junit.annotations.Network;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.netty.NettyTransport;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Set;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.hamcrest.Matchers.is;
/**
*
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 1)
public class NettyTransportMultiPortTests extends ElasticsearchTestCase {
private NettyTransport nettyTransport;
private ThreadPool threadPool;
@After
public void shutdownNettyTransport() {
if (nettyTransport != null) {
nettyTransport.stop();
}
if (threadPool != null) {
threadPool.shutdownNow();
}
}
@Test
@TestLogging("shield.transport.netty:DEBUG")
public void testThatNettyCanBindToMultiplePorts() throws Exception {
int[] ports = getRandomPorts(3);
Settings settings = settingsBuilder()
.put("network.host", "127.0.0.1")
.put("transport.tcp.port", ports[0])
.put("transport.profiles.default.port", ports[1])
.put("transport.profiles.client1.port", ports[2])
.build();
startNettyTransport(settings);
assertConnectionRefused(ports[0]);
assertPortIsBound(ports[1]);
assertPortIsBound(ports[2]);
}
@Test
@TestLogging("transport.netty:DEBUG")
public void testThatDefaultProfileInheritsFromStandardSettings() throws Exception {
int[] ports = getRandomPorts(2);
Settings settings = settingsBuilder()
.put("network.host", "127.0.0.1")
.put("transport.tcp.port", ports[0])
.put("transport.profiles.client1.port", ports[1])
.build();
startNettyTransport(settings);
assertPortIsBound(ports[0]);
assertPortIsBound(ports[1]);
}
@Test
@TestLogging("transport.netty:DEBUG")
public void testThatProfileWithoutPortSettingsFails() throws Exception {
int[] ports = getRandomPorts(1);
Settings settings = settingsBuilder()
.put("network.host", "127.0.0.1")
.put("transport.tcp.port", ports[0])
.put("transport.profiles.client1.whatever", "foo")
.build();
startNettyTransport(settings);
assertPortIsBound(ports[0]);
}
@Test
@TestLogging("transport.netty:DEBUG")
public void testThatDefaultProfilePortOverridesGeneralConfiguration() throws Exception {
int[] ports = getRandomPorts(3);
Settings settings = settingsBuilder()
.put("network.host", "127.0.0.1")
.put("transport.tcp.port", ports[0])
.put("transport.netty.port", ports[1])
.put("transport.profiles.default.port", ports[2])
.build();
startNettyTransport(settings);
assertConnectionRefused(ports[0]);
assertConnectionRefused(ports[1]);
assertPortIsBound(ports[2]);
}
@Test
@Network
public void testThatBindingOnDifferentHostsWorks() throws Exception {
int[] ports = getRandomPorts(2);
InetAddress firstNonLoopbackAddress = NetworkUtils.getFirstNonLoopbackAddress(NetworkUtils.StackType.IPv4);
Settings settings = settingsBuilder()
.put("network.host", "127.0.0.1")
.put("transport.tcp.port", ports[0])
.put("transport.profiles.default.bind_host", "127.0.0.1")
.put("transport.profiles.client1.bind_host", firstNonLoopbackAddress.getHostAddress())
.put("transport.profiles.client1.port", ports[1])
.build();
startNettyTransport(settings);
assertPortIsBound("127.0.0.1", ports[0]);
assertPortIsBound(firstNonLoopbackAddress.getHostAddress(), ports[1]);
assertConnectionRefused(ports[1]);
}
// TODO, make sure one can check more settings and that they are applied correctly
private int[] getRandomPorts(int numberOfPorts) {
IntOpenHashSet ports = new IntOpenHashSet();
for (int i = 0; i < numberOfPorts; i++) {
int port = randomIntBetween(1025, 65000);
while (ports.contains(port)) {
port = randomIntBetween(1025, 65000);
}
ports.add(port);
}
return ports.toArray();
}
private void startNettyTransport(Settings settings) {
threadPool = new ThreadPool("tst");
BigArrays bigArrays = new MockBigArrays(settings, new PageCacheRecycler(settings, threadPool), new NoneCircuitBreakerService());
nettyTransport = new NettyTransport(settings, threadPool, new NetworkService(settings), bigArrays, Version.CURRENT);
nettyTransport.start();
assertThat(nettyTransport.lifecycleState(), is(Lifecycle.State.STARTED));
}
private void assertConnectionRefused(int port) throws Exception {
try {
trySocketConnection(new InetSocketTransportAddress("localhost", port).address());
fail("Expected to get exception when connecting to port " + port);
} catch (IOException e) {
// expected
logger.info("Got expected connection message {}", e.getMessage());
}
}
private void assertPortIsBound(int port) throws Exception {
assertPortIsBound("localhost", port);
}
private void assertPortIsBound(String host, int port) throws Exception {
logger.info("Trying to connect to [{}]:[{}]", host, port);
trySocketConnection(new InetSocketTransportAddress(host, port).address());
}
private void trySocketConnection(InetSocketAddress address) throws Exception {
try (Socket socket = new Socket()) {
logger.info("Connecting to {}", address);
socket.connect(address, 500);
assertThat(socket.isConnected(), is(true));
try (OutputStream os = socket.getOutputStream()) {
os.write("foo".getBytes(Charsets.UTF_8));
os.flush();
}
}
}
}

View File

@ -90,16 +90,16 @@ public class NettyTransportTests extends ElasticsearchIntegrationTest {
}
@Override
public ChannelPipelineFactory configureServerChannelPipelineFactory() {
return new ErrorPipelineFactory(this);
public ChannelPipelineFactory configureServerChannelPipelineFactory(String name, Settings groupSettings) {
return new ErrorPipelineFactory(this, name, groupSettings);
}
private static class ErrorPipelineFactory extends ServerChannelPipeFactory {
private static class ErrorPipelineFactory extends ServerChannelPipelineFactory {
private final ESLogger logger;
public ErrorPipelineFactory(ExceptionThrowingNettyTransport exceptionThrowingNettyTransport) {
super(exceptionThrowingNettyTransport);
public ErrorPipelineFactory(ExceptionThrowingNettyTransport exceptionThrowingNettyTransport, String name, Settings groupSettings) {
super(exceptionThrowingNettyTransport, name, groupSettings);
this.logger = exceptionThrowingNettyTransport.logger;
}