Ensure port range is readable in the exception message (#20893)
Both netty3 and netty4 http implementation printed the default toString representation of PortRange if ports couldn't be bound. This commit adds a better default toString method to PortRange and uses the string representation for the error message in the http implementations.
This commit is contained in:
parent
968fbaceef
commit
12392b5425
|
@ -80,4 +80,11 @@ public class PortsRange {
|
|||
public interface PortCallback {
|
||||
boolean onPortNumber(int portNumber);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "PortsRange{" +
|
||||
"portRange='" + portRange + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -281,34 +281,42 @@ public class Netty3HttpServerTransport extends AbstractLifecycleComponent implem
|
|||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
this.serverOpenChannels = new Netty3OpenChannelsHandler(logger);
|
||||
if (blockingServer) {
|
||||
serverBootstrap = new ServerBootstrap(new OioServerSocketChannelFactory(
|
||||
Executors.newCachedThreadPool(daemonThreadFactory(settings, HTTP_SERVER_BOSS_THREAD_NAME_PREFIX)),
|
||||
Executors.newCachedThreadPool(daemonThreadFactory(settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX))
|
||||
));
|
||||
} else {
|
||||
serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
|
||||
Executors.newCachedThreadPool(daemonThreadFactory(settings, HTTP_SERVER_BOSS_THREAD_NAME_PREFIX)),
|
||||
Executors.newCachedThreadPool(daemonThreadFactory(settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)),
|
||||
workerCount));
|
||||
}
|
||||
serverBootstrap.setPipelineFactory(configureServerChannelPipelineFactory());
|
||||
boolean success = false;
|
||||
try {
|
||||
this.serverOpenChannels = new Netty3OpenChannelsHandler(logger);
|
||||
if (blockingServer) {
|
||||
serverBootstrap = new ServerBootstrap(new OioServerSocketChannelFactory(
|
||||
Executors.newCachedThreadPool(daemonThreadFactory(settings, HTTP_SERVER_BOSS_THREAD_NAME_PREFIX)),
|
||||
Executors.newCachedThreadPool(daemonThreadFactory(settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX))
|
||||
));
|
||||
} else {
|
||||
serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
|
||||
Executors.newCachedThreadPool(daemonThreadFactory(settings, HTTP_SERVER_BOSS_THREAD_NAME_PREFIX)),
|
||||
Executors.newCachedThreadPool(daemonThreadFactory(settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)),
|
||||
workerCount));
|
||||
}
|
||||
serverBootstrap.setPipelineFactory(configureServerChannelPipelineFactory());
|
||||
|
||||
serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
|
||||
serverBootstrap.setOption("child.keepAlive", tcpKeepAlive);
|
||||
if (tcpSendBufferSize.getBytes() > 0) {
|
||||
serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
|
||||
serverBootstrap.setOption("child.keepAlive", tcpKeepAlive);
|
||||
if (tcpSendBufferSize.getBytes() > 0) {
|
||||
|
||||
serverBootstrap.setOption("child.sendBufferSize", tcpSendBufferSize.getBytes());
|
||||
serverBootstrap.setOption("child.sendBufferSize", tcpSendBufferSize.getBytes());
|
||||
}
|
||||
if (tcpReceiveBufferSize.getBytes() > 0) {
|
||||
serverBootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize.getBytes());
|
||||
}
|
||||
serverBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
|
||||
serverBootstrap.setOption("child.receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
|
||||
serverBootstrap.setOption("reuseAddress", reuseAddress);
|
||||
serverBootstrap.setOption("child.reuseAddress", reuseAddress);
|
||||
this.boundAddress = createBoundHttpAddress();
|
||||
success = true;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
doStop(); // otherwise we leak threads since we never moved to started
|
||||
}
|
||||
}
|
||||
if (tcpReceiveBufferSize.getBytes() > 0) {
|
||||
serverBootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize.getBytes());
|
||||
}
|
||||
serverBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
|
||||
serverBootstrap.setOption("child.receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
|
||||
serverBootstrap.setOption("reuseAddress", reuseAddress);
|
||||
serverBootstrap.setOption("child.reuseAddress", reuseAddress);
|
||||
this.boundAddress = createBoundHttpAddress();
|
||||
}
|
||||
|
||||
private BoundTransportAddress createBoundHttpAddress() {
|
||||
|
@ -402,24 +410,21 @@ public class Netty3HttpServerTransport extends AbstractLifecycleComponent implem
|
|||
private TransportAddress bindAddress(final InetAddress hostAddress) {
|
||||
final AtomicReference<Exception> lastException = new AtomicReference<>();
|
||||
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
|
||||
boolean success = port.iterate(new PortsRange.PortCallback() {
|
||||
@Override
|
||||
public boolean onPortNumber(int portNumber) {
|
||||
try {
|
||||
synchronized (serverChannels) {
|
||||
Channel channel = serverBootstrap.bind(new InetSocketAddress(hostAddress, portNumber));
|
||||
serverChannels.add(channel);
|
||||
boundSocket.set((InetSocketAddress) channel.getLocalAddress());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
lastException.set(e);
|
||||
return false;
|
||||
boolean success = port.iterate(portNumber -> {
|
||||
try {
|
||||
synchronized (serverChannels) {
|
||||
Channel channel = serverBootstrap.bind(new InetSocketAddress(hostAddress, portNumber));
|
||||
serverChannels.add(channel);
|
||||
boundSocket.set((InetSocketAddress) channel.getLocalAddress());
|
||||
}
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
lastException.set(e);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
});
|
||||
if (!success) {
|
||||
throw new BindHttpException("Failed to bind to [" + port + "]", lastException.get());
|
||||
throw new BindHttpException("Failed to bind to [" + port.getPortRangeString() + "]", lastException.get());
|
||||
}
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
|
|
|
@ -22,8 +22,10 @@ package org.elasticsearch.http.netty3;
|
|||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.MockBigArrays;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.http.BindHttpException;
|
||||
import org.elasticsearch.http.netty3.cors.Netty3CorsConfig;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
@ -103,4 +105,17 @@ public class Netty3HttpServerTransportTests extends ESTestCase {
|
|||
assertThat(corsConfig.allowedRequestMethods().stream().map(HttpMethod::getName).collect(Collectors.toSet()), equalTo(methods));
|
||||
transport.close();
|
||||
}
|
||||
|
||||
public void testBindUnavailableAddress() {
|
||||
try (Netty3HttpServerTransport transport = new Netty3HttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool)) {
|
||||
transport.start();
|
||||
TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
|
||||
Settings settings = Settings.builder().put("http.port", remoteAddress.getPort()).build();
|
||||
try (Netty3HttpServerTransport otherTransport = new Netty3HttpServerTransport(settings, networkService, bigArrays,
|
||||
threadPool)) {
|
||||
BindHttpException bindHttpException = expectThrows(BindHttpException.class, () -> otherTransport.start());
|
||||
assertEquals("Failed to bind to [" + remoteAddress.getPort() + "]", bindHttpException.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,9 +28,11 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
|
||||
import org.elasticsearch.transport.BindTransportException;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -78,4 +80,26 @@ public class SimpleNetty3TransportTests extends AbstractSimpleTransportTestCase
|
|||
assertThat(e.getMessage(), containsString("[127.0.0.1:9876]"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testBindUnavailableAddress() {
|
||||
// this is on a lower level since it needs access to the TransportService before it's started
|
||||
int port = serviceA.boundAddress().publishAddress().getPort();
|
||||
Settings settings = Settings.builder()
|
||||
.put(Node.NODE_NAME_SETTING.getKey(), "foobar")
|
||||
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
|
||||
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
|
||||
.put("transport.tcp.port", port)
|
||||
.build();
|
||||
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
BindTransportException bindTransportException = expectThrows(BindTransportException.class, () -> {
|
||||
MockTransportService transportService = nettyFromThreadPool(settings, threadPool, Version.CURRENT, clusterSettings);
|
||||
try {
|
||||
transportService.start();
|
||||
} finally {
|
||||
transportService.stop();
|
||||
transportService.close();
|
||||
}
|
||||
});
|
||||
assertEquals("Failed to bind to ["+ port + "]", bindTransportException.getMessage());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -285,40 +285,50 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
|
|||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
this.serverOpenChannels = new Netty4OpenChannelsHandler(logger);
|
||||
boolean success = false;
|
||||
try {
|
||||
this.serverOpenChannels = new Netty4OpenChannelsHandler(logger);
|
||||
|
||||
serverBootstrap = new ServerBootstrap();
|
||||
if (blockingServer) {
|
||||
serverBootstrap.group(new OioEventLoopGroup(workerCount, daemonThreadFactory(settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
|
||||
serverBootstrap.channel(OioServerSocketChannel.class);
|
||||
} else {
|
||||
serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
|
||||
serverBootstrap.channel(NioServerSocketChannel.class);
|
||||
serverBootstrap = new ServerBootstrap();
|
||||
if (blockingServer) {
|
||||
serverBootstrap.group(new OioEventLoopGroup(workerCount, daemonThreadFactory(settings,
|
||||
HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
|
||||
serverBootstrap.channel(OioServerSocketChannel.class);
|
||||
} else {
|
||||
serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,
|
||||
HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
|
||||
serverBootstrap.channel(NioServerSocketChannel.class);
|
||||
}
|
||||
|
||||
serverBootstrap.childHandler(configureServerChannelHandler());
|
||||
|
||||
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings));
|
||||
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings));
|
||||
|
||||
final ByteSizeValue tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);
|
||||
if (tcpSendBufferSize.getBytes() > 0) {
|
||||
serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes()));
|
||||
}
|
||||
|
||||
final ByteSizeValue tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings);
|
||||
if (tcpReceiveBufferSize.getBytes() > 0) {
|
||||
serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes()));
|
||||
}
|
||||
|
||||
serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
|
||||
serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
|
||||
|
||||
final boolean reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);
|
||||
serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
|
||||
serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);
|
||||
|
||||
this.boundAddress = createBoundHttpAddress();
|
||||
success = true;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
doStop(); // otherwise we leak threads since we never moved to started
|
||||
}
|
||||
}
|
||||
|
||||
serverBootstrap.childHandler(configureServerChannelHandler());
|
||||
|
||||
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings));
|
||||
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings));
|
||||
|
||||
final ByteSizeValue tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);
|
||||
if (tcpSendBufferSize.getBytes() > 0) {
|
||||
serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes()));
|
||||
}
|
||||
|
||||
final ByteSizeValue tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings);
|
||||
if (tcpReceiveBufferSize.getBytes() > 0) {
|
||||
serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes()));
|
||||
}
|
||||
|
||||
serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
|
||||
serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
|
||||
|
||||
final boolean reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);
|
||||
serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
|
||||
serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);
|
||||
|
||||
this.boundAddress = createBoundHttpAddress();
|
||||
}
|
||||
|
||||
private BoundTransportAddress createBoundHttpAddress() {
|
||||
|
@ -417,24 +427,21 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
|
|||
private TransportAddress bindAddress(final InetAddress hostAddress) {
|
||||
final AtomicReference<Exception> lastException = new AtomicReference<>();
|
||||
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
|
||||
boolean success = port.iterate(new PortsRange.PortCallback() {
|
||||
@Override
|
||||
public boolean onPortNumber(int portNumber) {
|
||||
try {
|
||||
synchronized (serverChannels) {
|
||||
ChannelFuture future = serverBootstrap.bind(new InetSocketAddress(hostAddress, portNumber)).sync();
|
||||
serverChannels.add(future.channel());
|
||||
boundSocket.set((InetSocketAddress) future.channel().localAddress());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
lastException.set(e);
|
||||
return false;
|
||||
boolean success = port.iterate(portNumber -> {
|
||||
try {
|
||||
synchronized (serverChannels) {
|
||||
ChannelFuture future = serverBootstrap.bind(new InetSocketAddress(hostAddress, portNumber)).sync();
|
||||
serverChannels.add(future.channel());
|
||||
boundSocket.set((InetSocketAddress) future.channel().localAddress());
|
||||
}
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
lastException.set(e);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
});
|
||||
if (!success) {
|
||||
throw new BindHttpException("Failed to bind to [" + port + "]", lastException.get());
|
||||
throw new BindHttpException("Failed to bind to [" + port.getPortRangeString() + "]", lastException.get());
|
||||
}
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.elasticsearch.common.network.NetworkService;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.MockBigArrays;
|
||||
import org.elasticsearch.http.BindHttpException;
|
||||
import org.elasticsearch.http.netty4.cors.Netty4CorsConfig;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.rest.BytesRestResponse;
|
||||
|
@ -123,7 +124,7 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
|
|||
transport.httpServerAdapter((request, channel, context) ->
|
||||
channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done"))));
|
||||
transport.start();
|
||||
TransportAddress remoteAddress = (TransportAddress) randomFrom(transport.boundAddress().boundAddresses());
|
||||
TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
|
||||
|
||||
try (Netty4HttpClient client = new Netty4HttpClient()) {
|
||||
FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/");
|
||||
|
@ -140,4 +141,17 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testBindUnavailableAddress() {
|
||||
try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool)) {
|
||||
transport.start();
|
||||
TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
|
||||
Settings settings = Settings.builder().put("http.port", remoteAddress.getPort()).build();
|
||||
try (Netty4HttpServerTransport otherTransport = new Netty4HttpServerTransport(settings, networkService, bigArrays,
|
||||
threadPool)) {
|
||||
BindHttpException bindHttpException = expectThrows(BindHttpException.class, () -> otherTransport.start());
|
||||
assertEquals("Failed to bind to [" + remoteAddress.getPort() + "]", bindHttpException.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,9 +28,11 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
|
||||
import org.elasticsearch.transport.BindTransportException;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -79,4 +81,26 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase
|
|||
}
|
||||
}
|
||||
|
||||
public void testBindUnavailableAddress() {
|
||||
// this is on a lower level since it needs access to the TransportService before it's started
|
||||
int port = serviceA.boundAddress().publishAddress().getPort();
|
||||
Settings settings = Settings.builder()
|
||||
.put(Node.NODE_NAME_SETTING.getKey(), "foobar")
|
||||
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
|
||||
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
|
||||
.put("transport.tcp.port", port)
|
||||
.build();
|
||||
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
BindTransportException bindTransportException = expectThrows(BindTransportException.class, () -> {
|
||||
MockTransportService transportService = nettyFromThreadPool(settings, threadPool, Version.CURRENT, clusterSettings);
|
||||
try {
|
||||
transportService.start();
|
||||
} finally {
|
||||
transportService.stop();
|
||||
transportService.close();
|
||||
}
|
||||
});
|
||||
assertEquals("Failed to bind to ["+ port + "]", bindTransportException.getMessage());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue