diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java index 124bd607ab7..ab078ad10d3 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java @@ -29,8 +29,6 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.http.HttpPipelinedRequest; import org.elasticsearch.transport.netty4.Netty4Utils; -import static org.elasticsearch.http.netty4.Netty4HttpServerTransport.HTTP_CHANNEL_KEY; - @ChannelHandler.Sharable class Netty4HttpRequestHandler extends SimpleChannelInboundHandler> { @@ -42,7 +40,7 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler msg) throws Exception { - Netty4HttpChannel channel = ctx.channel().attr(HTTP_CHANNEL_KEY).get(); + Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get(); FullHttpRequest request = msg.getRequest(); try { @@ -77,12 +75,11 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler closeContext = new CompletableContext<>(); + + Netty4HttpServerChannel(Channel channel) { + this.channel = channel; + this.channel.closeFuture().addListener(f -> { + if (f.isSuccess()) { + closeContext.complete(null); + } else { + Throwable cause = f.cause(); + if (cause instanceof Error) { + Netty4Utils.maybeDie(cause); + closeContext.completeExceptionally(new Exception(cause)); + } else { + closeContext.completeExceptionally((Exception) cause); + } + } + }); + } + + @Override + public InetSocketAddress getLocalAddress() { + return (InetSocketAddress) channel.localAddress(); + } + + @Override + public void addCloseListener(ActionListener listener) { + closeContext.addListener(ActionListener.toBiConsumer(listener)); + } + + @Override + public boolean isOpen() { + return channel.isOpen(); + } + + @Override + public void close() { + channel.close(); + } + + @Override + public String toString() { + return "Netty4HttpChannel{localAddress=" + getLocalAddress() + "}"; + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 8a49ce38b89..34f00c06840 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -23,6 +23,7 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -42,22 +43,19 @@ import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.util.AttributeKey; import org.elasticsearch.common.Strings; import org.elasticsearch.common.network.CloseableChannel; -import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.http.AbstractHttpServerTransport; -import org.elasticsearch.http.BindHttpException; import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.HttpHandlingSettings; -import org.elasticsearch.http.HttpStats; +import org.elasticsearch.http.HttpServerChannel; import org.elasticsearch.http.netty4.cors.Netty4CorsConfig; import org.elasticsearch.http.netty4.cors.Netty4CorsConfigBuilder; import org.elasticsearch.http.netty4.cors.Netty4CorsHandler; @@ -65,14 +63,9 @@ import org.elasticsearch.rest.RestUtils; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.netty4.Netty4Utils; -import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; @@ -154,12 +147,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { private final int pipeliningMaxEvents; - private final boolean tcpNoDelay; - private final boolean tcpKeepAlive; - private final boolean reuseAddress; - - private final ByteSizeValue tcpSendBufferSize; - private final ByteSizeValue tcpReceiveBufferSize; private final RecvByteBufAllocator recvByteBufAllocator; private final int readTimeoutMillis; @@ -167,8 +154,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { protected volatile ServerBootstrap serverBootstrap; - protected final List serverChannels = new ArrayList<>(); - private final Netty4CorsConfig corsConfig; public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool, @@ -184,11 +169,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings); this.workerCount = SETTING_HTTP_WORKER_COUNT.get(settings); - this.tcpNoDelay = SETTING_HTTP_TCP_NO_DELAY.get(settings); - this.tcpKeepAlive = SETTING_HTTP_TCP_KEEP_ALIVE.get(settings); - this.reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings); - this.tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings); - this.tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings); this.readTimeoutMillis = Math.toIntExact(SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis()); ByteSizeValue receivePredictor = SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE.get(settings); @@ -217,6 +197,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.childHandler(configureServerChannelHandler()); + serverBootstrap.handler(new ServerChannelExceptionHandler(this)); serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings)); serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings)); @@ -238,10 +219,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress); serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress); - this.boundAddress = createBoundHttpAddress(); - if (logger.isInfoEnabled()) { - logger.info("{}", boundAddress); - } + bindServer(); success = true; } finally { if (success == false) { @@ -284,78 +262,29 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { } @Override - protected TransportAddress bindAddress(final InetAddress hostAddress) { - final AtomicReference lastException = new AtomicReference<>(); - final AtomicReference boundSocket = new AtomicReference<>(); - boolean success = port.iterate(portNumber -> { - try { - synchronized (serverChannels) { - ChannelFuture future = serverBootstrap.bind(new InetSocketAddress(hostAddress, portNumber)).sync(); - serverChannels.add(future.channel()); - boundSocket.set((InetSocketAddress) future.channel().localAddress()); - } - } catch (Exception e) { - lastException.set(e); - return false; - } - return true; - }); - if (!success) { - throw new BindHttpException("Failed to bind to [" + port.getPortRangeString() + "]", lastException.get()); - } - - if (logger.isDebugEnabled()) { - logger.debug("Bound http to address {{}}", NetworkAddress.format(boundSocket.get())); - } - return new TransportAddress(boundSocket.get()); + protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Exception { + ChannelFuture future = serverBootstrap.bind(socketAddress).sync(); + Channel channel = future.channel(); + Netty4HttpServerChannel httpServerChannel = new Netty4HttpServerChannel(channel); + channel.attr(HTTP_SERVER_CHANNEL_KEY).set(httpServerChannel); + return httpServerChannel; } @Override - protected void doStop() { - synchronized (serverChannels) { - if (!serverChannels.isEmpty()) { - try { - Netty4Utils.closeChannels(serverChannels); - } catch (IOException e) { - logger.trace("exception while closing channels", e); - } finally { - serverChannels.clear(); - } - } - } - - // TODO: Move all of channel closing to abstract class once server channels are handled - try { - CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true); - } catch (Exception e) { - logger.warn("unexpected exception while closing http channels", e); - } - httpChannels.clear(); - - - + protected void stopInternal() { if (serverBootstrap != null) { serverBootstrap.config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS).awaitUninterruptibly(); serverBootstrap = null; } } - @Override - protected void doClose() { - } - - @Override - public HttpStats stats() { - return new HttpStats(httpChannels.size(), totalChannelsAccepted.get()); - } - @Override protected void onException(HttpChannel channel, Exception cause) { if (cause instanceof ReadTimeoutException) { if (logger.isTraceEnabled()) { logger.trace("Http read timeout {}", channel); } - CloseableChannel.closeChannel(channel);; + CloseableChannel.closeChannel(channel); } else { super.onException(channel, cause); } @@ -366,6 +295,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { } static final AttributeKey HTTP_CHANNEL_KEY = AttributeKey.newInstance("es-http-channel"); + static final AttributeKey HTTP_SERVER_CHANNEL_KEY = AttributeKey.newInstance("es-http-server-channel"); protected static class HttpChannelHandler extends ChannelInitializer { @@ -413,4 +343,24 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { } } + @ChannelHandler.Sharable + private static class ServerChannelExceptionHandler extends ChannelHandlerAdapter { + + private final Netty4HttpServerTransport transport; + + private ServerChannelExceptionHandler(Netty4HttpServerTransport transport) { + this.transport = transport; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + Netty4Utils.maybeDie(cause); + Netty4HttpServerChannel httpServerChannel = ctx.channel().attr(HTTP_SERVER_CHANNEL_KEY).get(); + if (cause instanceof Error) { + transport.onServerException(httpServerChannel, new Exception(cause)); + } else { + transport.onServerException(httpServerChannel, (Exception) cause); + } + } + } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java index 58440ae96e0..698c86d048c 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java @@ -24,6 +24,8 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.util.Attribute; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.transport.TcpHeader; import org.elasticsearch.transport.Transports; @@ -36,11 +38,9 @@ import org.elasticsearch.transport.Transports; final class Netty4MessageChannelHandler extends ChannelDuplexHandler { private final Netty4Transport transport; - private final String profileName; - Netty4MessageChannelHandler(Netty4Transport transport, String profileName) { + Netty4MessageChannelHandler(Netty4Transport transport) { this.transport = transport; - this.profileName = profileName; } @Override @@ -58,7 +58,7 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler { // netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh // buffer, or in the cumulative buffer, which is cleaned each time so it could be bigger than the actual size BytesReference reference = Netty4Utils.toBytesReference(buffer, remainingMessageSize); - Attribute channelAttribute = channel.attr(Netty4Transport.CHANNEL_KEY); + Attribute channelAttribute = channel.attr(Netty4Transport.CHANNEL_KEY); transport.messageReceived(reference, channelAttribute.get()); } finally { // Set the expected position of the buffer, no matter what happened @@ -69,7 +69,13 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Netty4Utils.maybeDie(cause); - transport.exceptionCaught(ctx, cause); + final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class); + final Throwable newCause = unwrapped != null ? unwrapped : cause; + Netty4TcpChannel tcpChannel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get(); + if (newCause instanceof Error) { + transport.onException(tcpChannel, new Exception(newCause)); + } else { + transport.onException(tcpChannel, (Exception) newCause); + } } - } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java similarity index 96% rename from modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java rename to modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java index 89fabdcd763..78a14255000 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java @@ -30,13 +30,13 @@ import org.elasticsearch.transport.TransportException; import java.net.InetSocketAddress; -public class NettyTcpChannel implements TcpChannel { +public class Netty4TcpChannel implements TcpChannel { private final Channel channel; private final String profile; private final CompletableContext closeContext = new CompletableContext<>(); - NettyTcpChannel(Channel channel, String profile) { + Netty4TcpChannel(Channel channel, String profile) { this.channel = channel; this.profile = profile; this.channel.closeFuture().addListener(f -> { @@ -118,7 +118,7 @@ public class NettyTcpChannel implements TcpChannel { @Override public String toString() { - return "NettyTcpChannel{" + + return "Netty4TcpChannel{" + "localAddress=" + getLocalAddress() + ", remoteAddress=" + channel.remoteAddress() + '}'; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java new file mode 100644 index 00000000000..873a6c33fba --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java @@ -0,0 +1,84 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.netty4; + +import io.netty.channel.Channel; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.concurrent.CompletableContext; +import org.elasticsearch.transport.TcpServerChannel; + +import java.net.InetSocketAddress; + +public class Netty4TcpServerChannel implements TcpServerChannel { + + private final Channel channel; + private final String profile; + private final CompletableContext closeContext = new CompletableContext<>(); + + Netty4TcpServerChannel(Channel channel, String profile) { + this.channel = channel; + this.profile = profile; + this.channel.closeFuture().addListener(f -> { + if (f.isSuccess()) { + closeContext.complete(null); + } else { + Throwable cause = f.cause(); + if (cause instanceof Error) { + Netty4Utils.maybeDie(cause); + closeContext.completeExceptionally(new Exception(cause)); + } else { + closeContext.completeExceptionally((Exception) cause); + } + } + }); + } + + @Override + public String getProfile() { + return profile; + } + + @Override + public InetSocketAddress getLocalAddress() { + return (InetSocketAddress) channel.localAddress(); + } + + @Override + public void close() { + channel.close(); + } + + @Override + public void addCloseListener(ActionListener listener) { + closeContext.addListener(ActionListener.toBiConsumer(listener)); + } + + @Override + public boolean isOpen() { + return channel.isOpen(); + } + + @Override + public String toString() { + return "Netty4TcpChannel{" + + "localAddress=" + getLocalAddress() + + '}'; + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 466c4b68bfa..c8c6fceb543 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -25,6 +25,7 @@ import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -37,8 +38,6 @@ import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.collect.Tuple; @@ -196,6 +195,7 @@ public class Netty4Transport extends TcpTransport { serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.childHandler(getServerChannelInitializer(name)); + serverBootstrap.handler(new ServerChannelExceptionHandler()); serverBootstrap.childOption(ChannelOption.TCP_NODELAY, profileSettings.tcpNoDelay); serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, profileSettings.tcpKeepAlive); @@ -226,17 +226,11 @@ public class Netty4Transport extends TcpTransport { return new ClientChannelInitializer(); } - static final AttributeKey CHANNEL_KEY = AttributeKey.newInstance("es-channel"); - - protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class); - final Throwable t = unwrapped != null ? unwrapped : cause; - Channel channel = ctx.channel(); - onException(channel.attr(CHANNEL_KEY).get(), t instanceof Exception ? (Exception) t : new ElasticsearchException(t)); - } + static final AttributeKey CHANNEL_KEY = AttributeKey.newInstance("es-channel"); + static final AttributeKey SERVER_CHANNEL_KEY = AttributeKey.newInstance("es-server-channel"); @Override - protected NettyTcpChannel initiateChannel(InetSocketAddress address, ActionListener listener) throws IOException { + protected Netty4TcpChannel initiateChannel(InetSocketAddress address, ActionListener listener) throws IOException { ChannelFuture channelFuture = bootstrap.connect(address); Channel channel = channelFuture.channel(); if (channel == null) { @@ -245,7 +239,7 @@ public class Netty4Transport extends TcpTransport { } addClosedExceptionLogger(channel); - NettyTcpChannel nettyChannel = new NettyTcpChannel(channel, "default"); + Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, "default"); channel.attr(CHANNEL_KEY).set(nettyChannel); channelFuture.addListener(f -> { @@ -266,10 +260,10 @@ public class Netty4Transport extends TcpTransport { } @Override - protected NettyTcpChannel bind(String name, InetSocketAddress address) { + protected Netty4TcpServerChannel bind(String name, InetSocketAddress address) { Channel channel = serverBootstraps.get(name).bind(address).syncUninterruptibly().channel(); - NettyTcpChannel esChannel = new NettyTcpChannel(channel, name); - channel.attr(CHANNEL_KEY).set(esChannel); + Netty4TcpServerChannel esChannel = new Netty4TcpServerChannel(channel, name); + channel.attr(SERVER_CHANNEL_KEY).set(esChannel); return esChannel; } @@ -310,7 +304,7 @@ public class Netty4Transport extends TcpTransport { ch.pipeline().addLast("logging", new ESLoggingHandler()); ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); // using a dot as a prefix means this cannot come from any settings parsed - ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client")); + ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this)); } @Override @@ -331,11 +325,11 @@ public class Netty4Transport extends TcpTransport { @Override protected void initChannel(Channel ch) throws Exception { addClosedExceptionLogger(ch); - NettyTcpChannel nettyTcpChannel = new NettyTcpChannel(ch, name); + Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, name); ch.attr(CHANNEL_KEY).set(nettyTcpChannel); ch.pipeline().addLast("logging", new ESLoggingHandler()); ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); - ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name)); + ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this)); serverAcceptedChannel(nettyTcpChannel); } @@ -353,4 +347,19 @@ public class Netty4Transport extends TcpTransport { } }); } + + @ChannelHandler.Sharable + private class ServerChannelExceptionHandler extends ChannelHandlerAdapter { + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + Netty4Utils.maybeDie(cause); + Netty4TcpServerChannel serverChannel = ctx.channel().attr(SERVER_CHANNEL_KEY).get(); + if (cause instanceof Error) { + onServerException(serverChannel, new Exception(cause)); + } else { + onServerException(serverChannel, (Exception) cause); + } + } + } } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java index 7343da6c3b1..4c783cf0787 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java @@ -70,7 +70,7 @@ public class Netty4SizeHeaderFrameDecoderTests extends ESTestCase { nettyTransport.start(); TransportAddress[] boundAddresses = nettyTransport.boundAddress().boundAddresses(); - TransportAddress transportAddress = (TransportAddress) randomFrom(boundAddresses); + TransportAddress transportAddress = randomFrom(boundAddresses); port = transportAddress.address().getPort(); host = transportAddress.address().getAddress(); } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerChannel.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerChannel.java new file mode 100644 index 00000000000..2674d38dc49 --- /dev/null +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerChannel.java @@ -0,0 +1,44 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.http.nio; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.http.HttpServerChannel; +import org.elasticsearch.nio.NioServerSocketChannel; + +import java.io.IOException; +import java.nio.channels.ServerSocketChannel; + +public class NioHttpServerChannel extends NioServerSocketChannel implements HttpServerChannel { + + NioHttpServerChannel(ServerSocketChannel serverSocketChannel) throws IOException { + super(serverSocketChannel); + } + + @Override + public void addCloseListener(ActionListener listener) { + addCloseListener(ActionListener.toBiConsumer(listener)); + } + + @Override + public String toString() { + return "NioHttpServerChannel{localAddress=" + getLocalAddress() + "}"; + } +} diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java index aa0859e6146..b80778e9642 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java @@ -21,40 +21,29 @@ package org.elasticsearch.http.nio; import io.netty.handler.codec.http.HttpMethod; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionFuture; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.network.CloseableChannel; -import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.http.AbstractHttpServerTransport; -import org.elasticsearch.http.BindHttpException; import org.elasticsearch.http.HttpChannel; +import org.elasticsearch.http.HttpServerChannel; import org.elasticsearch.http.HttpServerTransport; -import org.elasticsearch.http.HttpStats; import org.elasticsearch.http.nio.cors.NioCorsConfig; import org.elasticsearch.http.nio.cors.NioCorsConfigBuilder; import org.elasticsearch.nio.BytesChannelContext; import org.elasticsearch.nio.ChannelFactory; import org.elasticsearch.nio.EventHandler; import org.elasticsearch.nio.InboundChannelBuffer; -import org.elasticsearch.nio.NioChannel; import org.elasticsearch.nio.NioGroup; import org.elasticsearch.nio.NioSelector; -import org.elasticsearch.nio.NioServerSocketChannel; import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.nio.ServerChannelContext; import org.elasticsearch.nio.SocketChannelContext; @@ -62,18 +51,11 @@ import org.elasticsearch.rest.RestUtils; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.regex.Pattern; @@ -113,7 +95,6 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { private final int tcpSendBufferSize; private final int tcpReceiveBufferSize; - private final Set serverChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); private NioGroup nioGroup; private HttpChannelFactory channelFactory; private final NioCorsConfig corsConfig; @@ -156,12 +137,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { daemonThreadFactory(this.settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX), workerCount, (s) -> new EventHandler(this::onNonChannelException, s)); channelFactory = new HttpChannelFactory(); - this.boundAddress = createBoundHttpAddress(); - - if (logger.isInfoEnabled()) { - logger.info("{}", boundAddress); - } - + bindServer(); success = true; } catch (IOException e) { throw new ElasticsearchException(e); @@ -173,26 +149,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { } @Override - protected void doStop() { - synchronized (serverChannels) { - if (serverChannels.isEmpty() == false) { - try { - closeChannels(new ArrayList<>(serverChannels)); - } catch (Exception e) { - logger.error("unexpected exception while closing http server channels", e); - } - serverChannels.clear(); - } - } - - // TODO: Move all of channel closing to abstract class once server channels are handled - try { - CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true); - } catch (Exception e) { - logger.warn("unexpected exception while closing http channels", e); - } - httpChannels.clear(); - + protected void stopInternal() { try { nioGroup.close(); } catch (Exception e) { @@ -201,40 +158,8 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { } @Override - protected void doClose() throws IOException { - } - - @Override - protected TransportAddress bindAddress(InetAddress hostAddress) { - final AtomicReference lastException = new AtomicReference<>(); - final AtomicReference boundSocket = new AtomicReference<>(); - boolean success = port.iterate(portNumber -> { - try { - synchronized (serverChannels) { - InetSocketAddress address = new InetSocketAddress(hostAddress, portNumber); - NioServerSocketChannel channel = nioGroup.bindServerChannel(address, channelFactory); - serverChannels.add(channel); - boundSocket.set(channel.getLocalAddress()); - } - } catch (Exception e) { - lastException.set(e); - return false; - } - return true; - }); - if (success == false) { - throw new BindHttpException("Failed to bind to [" + port.getPortRangeString() + "]", lastException.get()); - } - - if (logger.isDebugEnabled()) { - logger.debug("Bound http to address {{}}", NetworkAddress.format(boundSocket.get())); - } - return new TransportAddress(boundSocket.get()); - } - - @Override - public HttpStats stats() { - return new HttpStats(serverChannels.size(), totalChannelsAccepted.get()); + protected HttpServerChannel bind(InetSocketAddress socketAddress) throws IOException { + return nioGroup.bindServerChannel(socketAddress, channelFactory); } static NioCorsConfig buildCorsConfig(Settings settings) { @@ -269,33 +194,11 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { .build(); } - private void closeChannels(List channels) { - List> futures = new ArrayList<>(channels.size()); - - for (NioChannel channel : channels) { - PlainActionFuture future = PlainActionFuture.newFuture(); - channel.addCloseListener(ActionListener.toBiConsumer(future)); - futures.add(future); - channel.close(); - } - - List closeExceptions = new ArrayList<>(); - for (ActionFuture f : futures) { - try { - f.actionGet(); - } catch (RuntimeException e) { - closeExceptions.add(e); - } - } - - ExceptionsHelper.rethrowAndSuppress(closeExceptions); - } - private void acceptChannel(NioSocketChannel socketChannel) { super.serverAcceptedChannel((HttpChannel) socketChannel); } - private class HttpChannelFactory extends ChannelFactory { + private class HttpChannelFactory extends ChannelFactory { private HttpChannelFactory() { super(new RawChannelFactory(tcpNoDelay, tcpKeepAlive, reuseAddress, tcpSendBufferSize, tcpReceiveBufferSize)); @@ -303,29 +206,28 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { @Override public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException { - NioHttpChannel nioChannel = new NioHttpChannel(channel); + NioHttpChannel httpChannel = new NioHttpChannel(channel); java.util.function.Supplier pageSupplier = () -> { Recycler.V bytes = pageCacheRecycler.bytePage(false); return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close); }; - HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(nioChannel,NioHttpServerTransport.this, + HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(httpChannel,NioHttpServerTransport.this, handlingSettings, corsConfig); - Consumer exceptionHandler = (e) -> onException(nioChannel, e); - SocketChannelContext context = new BytesChannelContext(nioChannel, selector, exceptionHandler, httpReadWritePipeline, + Consumer exceptionHandler = (e) -> onException(httpChannel, e); + SocketChannelContext context = new BytesChannelContext(httpChannel, selector, exceptionHandler, httpReadWritePipeline, new InboundChannelBuffer(pageSupplier)); - nioChannel.setContext(context); - return nioChannel; + httpChannel.setContext(context); + return httpChannel; } @Override - public NioServerSocketChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { - NioServerSocketChannel nioChannel = new NioServerSocketChannel(channel); - Consumer exceptionHandler = (e) -> logger.error(() -> - new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e); + public NioHttpServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { + NioHttpServerChannel httpServerChannel = new NioHttpServerChannel(channel); + Consumer exceptionHandler = (e) -> onServerException(httpServerChannel, e); Consumer acceptor = NioHttpServerTransport.this::acceptChannel; - ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, acceptor, exceptionHandler); - nioChannel.setContext(context); - return nioChannel; + ServerChannelContext context = new ServerChannelContext(httpServerChannel, this, selector, acceptor, exceptionHandler); + httpServerChannel.setContext(context); + return httpServerChannel; } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpServerChannel.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpServerChannel.java index 10bf4ed7523..3c6d4b12df9 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpServerChannel.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpServerChannel.java @@ -20,19 +20,17 @@ package org.elasticsearch.transport.nio; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.nio.NioServerSocketChannel; -import org.elasticsearch.transport.TcpChannel; +import org.elasticsearch.transport.TcpServerChannel; import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.channels.ServerSocketChannel; /** - * This is an implementation of {@link NioServerSocketChannel} that adheres to the {@link TcpChannel} + * This is an implementation of {@link NioServerSocketChannel} that adheres to the {@link TcpServerChannel} * interface. As it is a server socket, setting SO_LINGER and sending messages is not supported. */ -public class NioTcpServerChannel extends NioServerSocketChannel implements TcpChannel { +public class NioTcpServerChannel extends NioServerSocketChannel implements TcpServerChannel { private final String profile; @@ -41,21 +39,6 @@ public class NioTcpServerChannel extends NioServerSocketChannel implements TcpCh this.profile = profile; } - @Override - public void sendMessage(BytesReference reference, ActionListener listener) { - throw new UnsupportedOperationException("Cannot send a message to a server channel."); - } - - @Override - public void setSoLinger(int value) throws IOException { - throw new UnsupportedOperationException("Cannot set SO_LINGER on a server channel."); - } - - @Override - public InetSocketAddress getRemoteAddress() { - return null; - } - @Override public void close() { getContext().closeChannel(); diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index cf7d37493cb..47229a0df2f 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -19,7 +19,6 @@ package org.elasticsearch.transport.nio; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -176,8 +175,7 @@ public class NioTransport extends TcpTransport { @Override public NioTcpServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { NioTcpServerChannel nioChannel = new NioTcpServerChannel(profileName, channel); - Consumer exceptionHandler = (e) -> logger.error(() -> - new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e); + Consumer exceptionHandler = (e) -> onServerException(nioChannel, e); Consumer acceptor = NioTransport.this::acceptChannel; ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, acceptor, exceptionHandler); nioChannel.setContext(context); diff --git a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java index 9d9008f7fb8..622020d6451 100644 --- a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.network.CloseableChannel; +import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; @@ -53,6 +54,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_BIND_HOST; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH; @@ -74,9 +76,10 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo private final String[] bindHosts; private final String[] publishHosts; - protected final AtomicLong totalChannelsAccepted = new AtomicLong(); - protected final Set httpChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); - protected volatile BoundTransportAddress boundAddress; + private volatile BoundTransportAddress boundAddress; + private final AtomicLong totalChannelsAccepted = new AtomicLong(); + private final Set httpChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set httpServerChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); protected AbstractHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool, NamedXContentRegistry xContentRegistry, Dispatcher dispatcher) { @@ -116,7 +119,12 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo return new HttpInfo(boundTransportAddress, maxContentLength.getBytes()); } - protected BoundTransportAddress createBoundHttpAddress() { + @Override + public HttpStats stats() { + return new HttpStats(httpChannels.size(), totalChannelsAccepted.get()); + } + + protected void bindServer() { // Bind and start to accept incoming connections. InetAddress hostAddresses[]; try { @@ -138,11 +146,71 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo } final int publishPort = resolvePublishPort(settings, boundAddresses, publishInetAddress); - final InetSocketAddress publishAddress = new InetSocketAddress(publishInetAddress, publishPort); - return new BoundTransportAddress(boundAddresses.toArray(new TransportAddress[0]), new TransportAddress(publishAddress)); + TransportAddress publishAddress = new TransportAddress(new InetSocketAddress(publishInetAddress, publishPort)); + this.boundAddress = new BoundTransportAddress(boundAddresses.toArray(new TransportAddress[0]), publishAddress); + logger.info("{}", boundAddress); } - protected abstract TransportAddress bindAddress(InetAddress hostAddress); + private TransportAddress bindAddress(final InetAddress hostAddress) { + final AtomicReference lastException = new AtomicReference<>(); + final AtomicReference boundSocket = new AtomicReference<>(); + boolean success = port.iterate(portNumber -> { + try { + synchronized (httpServerChannels) { + HttpServerChannel httpServerChannel = bind(new InetSocketAddress(hostAddress, portNumber)); + httpServerChannels.add(httpServerChannel); + boundSocket.set(httpServerChannel.getLocalAddress()); + } + } catch (Exception e) { + lastException.set(e); + return false; + } + return true; + }); + if (!success) { + throw new BindHttpException("Failed to bind to [" + port.getPortRangeString() + "]", lastException.get()); + } + + if (logger.isDebugEnabled()) { + logger.debug("Bound http to address {{}}", NetworkAddress.format(boundSocket.get())); + } + return new TransportAddress(boundSocket.get()); + } + + protected abstract HttpServerChannel bind(InetSocketAddress hostAddress) throws Exception; + + @Override + protected void doStop() { + synchronized (httpServerChannels) { + if (httpServerChannels.isEmpty() == false) { + try { + CloseableChannel.closeChannels(new ArrayList<>(httpServerChannels), true); + } catch (Exception e) { + logger.warn("exception while closing channels", e); + } finally { + httpServerChannels.clear(); + } + } + } + + try { + CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true); + } catch (Exception e) { + logger.warn("unexpected exception while closing http channels", e); + } + httpChannels.clear(); + + stopInternal(); + } + + @Override + protected void doClose() { + } + + /** + * Called to tear down internal resources + */ + protected abstract void stopInternal(); // package private for tests static int resolvePublishPort(Settings settings, List boundAddresses, InetAddress publishInetAddress) { @@ -197,19 +265,23 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo CloseableChannel.closeChannel(channel); } else { logger.warn(() -> new ParameterizedMessage( - "caught exception while handling client http traffic, closing connection {}", channel), e); + "caught exception while handling client http traffic, closing connection {}", channel), e); CloseableChannel.closeChannel(channel); } } + protected void onServerException(HttpServerChannel channel, Exception e) { + logger.error(new ParameterizedMessage("exception from http server channel caught on transport layer [channel={}]", channel), e); + } + /** * Exception handler for exceptions that are not associated with a specific channel. * * @param exception the exception */ protected void onNonChannelException(Exception exception) { - logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()), - exception); + String threadName = Thread.currentThread().getName(); + logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", threadName), exception); } protected void serverAcceptedChannel(HttpChannel httpChannel) { diff --git a/server/src/main/java/org/elasticsearch/http/HttpServerChannel.java b/server/src/main/java/org/elasticsearch/http/HttpServerChannel.java new file mode 100644 index 00000000000..e4222ae8168 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/http/HttpServerChannel.java @@ -0,0 +1,34 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.http; + +import org.elasticsearch.common.network.CloseableChannel; + +import java.net.InetSocketAddress; + +public interface HttpServerChannel extends CloseableChannel { + + /** + * Returns the local address for this channel. + * + * @return the local address of this channel. + */ + InetSocketAddress getLocalAddress(); +} diff --git a/server/src/main/java/org/elasticsearch/transport/TcpServerChannel.java b/server/src/main/java/org/elasticsearch/transport/TcpServerChannel.java new file mode 100644 index 00000000000..408ec1af20b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/TcpServerChannel.java @@ -0,0 +1,46 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.common.network.CloseableChannel; + +import java.net.InetSocketAddress; + + +/** + * This is a tcp channel representing a server channel listening for new connections. It is the server + * channel abstraction used by the {@link TcpTransport} and {@link TransportService}. All tcp transport + * implementations must return server channels that adhere to the required method contracts. + */ +public interface TcpServerChannel extends CloseableChannel { + + /** + * This returns the profile for this channel. + */ + String getProfile(); + + /** + * Returns the local address for this channel. + * + * @return the local address of this channel. + */ + InetSocketAddress getLocalAddress(); + +} diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index bd862c19e9c..c8f256c2db8 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -21,9 +21,6 @@ package org.elasticsearch.transport; import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.hppc.IntSet; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.common.Booleans; -import org.elasticsearch.common.network.CloseableChannel; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionFuture; @@ -31,6 +28,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NotifyOnceListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Booleans; import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; @@ -52,6 +50,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; +import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; @@ -68,6 +67,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.rest.RestStatus; @@ -210,7 +210,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private final ConcurrentMap profileBoundAddresses = newConcurrentMap(); // node id to actual channel private final ConcurrentMap connectedNodes = newConcurrentMap(); - private final Map> serverChannels = newConcurrentMap(); + private final Map> serverChannels = newConcurrentMap(); private final Set acceptedChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final KeyedLock connectionLock = new KeyedLock<>(); @@ -792,9 +792,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements final AtomicReference boundSocket = new AtomicReference<>(); boolean success = portsRange.iterate(portNumber -> { try { - TcpChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber)); + TcpServerChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber)); synchronized (serverChannels) { - List list = serverChannels.get(name); + List list = serverChannels.get(name); if (list == null) { list = new ArrayList<>(); serverChannels.put(name, list); @@ -957,9 +957,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements closeLock.writeLock().lock(); try { // first stop to accept any incoming connections so nobody can connect to this transport - for (Map.Entry> entry : serverChannels.entrySet()) { + for (Map.Entry> entry : serverChannels.entrySet()) { String profile = entry.getKey(); - List channels = entry.getValue(); + List channels = entry.getValue(); ActionListener closeFailLogger = ActionListener.wrap(c -> {}, e -> logger.warn(() -> new ParameterizedMessage("Error closing serverChannel for profile [{}]", profile), e)); channels.forEach(c -> c.addCloseListener(closeFailLogger)); @@ -999,7 +999,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements } } - protected void onException(TcpChannel channel, Exception e) { + public void onException(TcpChannel channel, Exception e) { if (!lifecycle.started()) { // just close and ignore - we are already stopped and just need to make sure we release all resources CloseableChannel.closeChannel(channel); @@ -1049,6 +1049,10 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements } } + protected void onServerException(TcpServerChannel channel, Exception e) { + logger.error(new ParameterizedMessage("exception from server channel caught on transport layer [channel={}]", channel), e); + } + /** * Exception handler for exceptions that are not associated with a specific channel. * @@ -1072,7 +1076,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements * @param name the profile name * @param address the address to bind to */ - protected abstract TcpChannel bind(String name, InetSocketAddress address) throws IOException; + protected abstract TcpServerChannel bind(String name, InetSocketAddress address) throws IOException; /** * Initiate a single tcp socket channel. @@ -1087,8 +1091,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements /** * Called to tear down internal resources */ - protected void stopInternal() { - } + protected abstract void stopInternal(); public boolean canCompress(TransportRequest request) { return compress && (!(request instanceof BytesTransportRequest)); diff --git a/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java b/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java index a7629e5f48b..ece9fd503c1 100644 --- a/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java +++ b/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java @@ -35,8 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; -import java.io.IOException; -import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; @@ -128,8 +127,9 @@ public class AbstractHttpServerTransportTests extends ESTestCase { try (AbstractHttpServerTransport transport = new AbstractHttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher) { + @Override - protected TransportAddress bindAddress(InetAddress hostAddress) { + protected HttpServerChannel bind(InetSocketAddress hostAddress) { return null; } @@ -139,12 +139,7 @@ public class AbstractHttpServerTransportTests extends ESTestCase { } @Override - protected void doStop() { - - } - - @Override - protected void doClose() throws IOException { + protected void stopInternal() { } diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 2328aa46363..d16300bf266 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -193,6 +193,10 @@ public class TcpTransportTests extends ESTestCase { return new FakeChannel(messageCaptor); } + @Override + protected void stopInternal() { + } + @Override public NodeChannels getConnection(DiscoveryNode node) { int numConnections = MockTcpTransport.LIGHT_PROFILE.getNumConnections(); @@ -237,7 +241,7 @@ public class TcpTransportTests extends ESTestCase { } } - private static final class FakeChannel implements TcpChannel { + private static final class FakeChannel implements TcpChannel, TcpServerChannel { private final AtomicReference messageCaptor; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 8831c46c011..bbff340c860 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -225,7 +225,7 @@ public class MockTcpTransport extends TcpTransport { socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings)); } - public final class MockChannel implements Closeable, TcpChannel { + public final class MockChannel implements Closeable, TcpChannel, TcpServerChannel { private final AtomicBoolean isOpen = new AtomicBoolean(true); private final InetSocketAddress localAddress; private final ServerSocket serverSocket; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index cb9e243660a..2ab8719c334 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -41,6 +41,7 @@ import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.nio.ServerChannelContext; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TcpChannel; +import org.elasticsearch.transport.TcpServerChannel; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.Transports; @@ -191,7 +192,7 @@ public class MockNioTransport extends TcpTransport { } } - private static class MockServerChannel extends NioServerSocketChannel implements TcpChannel { + private static class MockServerChannel extends NioServerSocketChannel implements TcpServerChannel { private final String profile; @@ -215,21 +216,6 @@ public class MockNioTransport extends TcpTransport { public void addCloseListener(ActionListener listener) { addCloseListener(ActionListener.toBiConsumer(listener)); } - - @Override - public void setSoLinger(int value) throws IOException { - throw new UnsupportedOperationException("Cannot set SO_LINGER on a server channel."); - } - - @Override - public InetSocketAddress getRemoteAddress() { - return null; - } - - @Override - public void sendMessage(BytesReference reference, ActionListener listener) { - throw new UnsupportedOperationException("Cannot send a message to a server channel."); - } } private static class MockSocketChannel extends NioSocketChannel implements TcpChannel { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java index ce06712722c..b761439b15b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java @@ -109,7 +109,7 @@ public class SecurityNetty4Transport extends Netty4Transport { } @Override - protected void onException(TcpChannel channel, Exception e) { + public void onException(TcpChannel channel, Exception e) { if (!lifecycle.started()) { // just close and ignore - we are already stopped and just need to make sure we release all resources CloseableChannel.closeChannel(channel); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java index 161ac3678ae..9427812ba13 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java @@ -24,7 +24,7 @@ import org.elasticsearch.transport.TcpTransportChannel; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.transport.netty4.NettyTcpChannel; +import org.elasticsearch.transport.netty4.Netty4TcpChannel; import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.core.security.authc.Authentication; import org.elasticsearch.xpack.core.security.user.KibanaUser; @@ -116,8 +116,8 @@ public interface ServerTransportFilter { } if (extractClientCert && (unwrappedChannel instanceof TcpTransportChannel) && - ((TcpTransportChannel) unwrappedChannel).getChannel() instanceof NettyTcpChannel) { - Channel channel = ((NettyTcpChannel) ((TcpTransportChannel) unwrappedChannel).getChannel()).getLowLevelChannel(); + ((TcpTransportChannel) unwrappedChannel).getChannel() instanceof Netty4TcpChannel) { + Channel channel = ((Netty4TcpChannel) ((TcpTransportChannel) unwrappedChannel).getChannel()).getLowLevelChannel(); SslHandler sslHandler = channel.pipeline().get(SslHandler.class); if (channel.isOpen()) { assert sslHandler != null : "channel [" + channel + "] did not have a ssl handler. pipeline " + channel.pipeline(); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java index 5315a944f77..fd1b1198607 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.security.transport.nio; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.recycler.Recycler; @@ -131,9 +130,8 @@ public class SecurityNioTransport extends NioTransport { @Override public NioTcpServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { - NioTcpServerChannel nioChannel = new NioTcpServerChannel(profileName, channel); - Consumer exceptionHandler = (e) -> logger.error(() -> - new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e); + NioTcpServerChannel nioChannel = new NioTcpServerChannel(profileName, channel);; + Consumer exceptionHandler = (e) -> onServerException(nioChannel, e); Consumer acceptor = SecurityNioTransport.this::acceptChannel; ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, acceptor, exceptionHandler); nioChannel.setContext(context);