From 9ab1325953b30fb3d881e9c27b2afdd3274322b6 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 20 Jun 2018 16:34:56 -0600 Subject: [PATCH] Introduce http and tcp server channels (#31446) Historically in TcpTransport server channels were represented by the same channel interface as socket channels. This was necessary as TcpTransport was parameterized by the channel type. This commit introduces TcpServerChannel and HttpServerChannel classes. Additionally, it adds the implementations for the various transports. This allows server channels to have unique functionality and not implement the methods they do not support (such as send and getRemoteAddress). Additionally, with the introduction of HttpServerChannel this commit extracts some of the storing and closing channel work to the abstract http server transport. --- .../http/netty4/Netty4HttpRequestHandler.java | 11 +- .../http/netty4/Netty4HttpServerChannel.java | 76 ++++++++++ .../netty4/Netty4HttpServerTransport.java | 116 +++++---------- .../netty4/Netty4MessageChannelHandler.java | 18 ++- ...yTcpChannel.java => Netty4TcpChannel.java} | 6 +- .../netty4/Netty4TcpServerChannel.java | 84 +++++++++++ .../transport/netty4/Netty4Transport.java | 45 +++--- .../Netty4SizeHeaderFrameDecoderTests.java | 2 +- .../http/nio/NioHttpServerChannel.java | 44 ++++++ .../http/nio/NioHttpServerTransport.java | 134 +++--------------- .../transport/nio/NioTcpServerChannel.java | 23 +-- .../transport/nio/NioTransport.java | 4 +- .../http/AbstractHttpServerTransport.java | 92 ++++++++++-- .../elasticsearch/http/HttpServerChannel.java | 34 +++++ .../transport/TcpServerChannel.java | 46 ++++++ .../elasticsearch/transport/TcpTransport.java | 27 ++-- .../AbstractHttpServerTransportTests.java | 13 +- .../transport/TcpTransportTests.java | 6 +- .../transport/MockTcpTransport.java | 2 +- .../transport/nio/MockNioTransport.java | 18 +-- .../netty4/SecurityNetty4Transport.java | 2 +- .../transport/ServerTransportFilter.java | 6 +- .../transport/nio/SecurityNioTransport.java | 6 +- 23 files changed, 501 insertions(+), 314 deletions(-) create mode 100644 modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerChannel.java rename modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/{NettyTcpChannel.java => Netty4TcpChannel.java} (96%) create mode 100644 modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java create mode 100644 plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerChannel.java create mode 100644 server/src/main/java/org/elasticsearch/http/HttpServerChannel.java create mode 100644 server/src/main/java/org/elasticsearch/transport/TcpServerChannel.java diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java index 124bd607ab7..ab078ad10d3 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java @@ -29,8 +29,6 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.http.HttpPipelinedRequest; import org.elasticsearch.transport.netty4.Netty4Utils; -import static org.elasticsearch.http.netty4.Netty4HttpServerTransport.HTTP_CHANNEL_KEY; - @ChannelHandler.Sharable class Netty4HttpRequestHandler extends SimpleChannelInboundHandler> { @@ -42,7 +40,7 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler msg) throws Exception { - Netty4HttpChannel channel = ctx.channel().attr(HTTP_CHANNEL_KEY).get(); + Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get(); FullHttpRequest request = msg.getRequest(); try { @@ -77,12 +75,11 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler closeContext = new CompletableContext<>(); + + Netty4HttpServerChannel(Channel channel) { + this.channel = channel; + this.channel.closeFuture().addListener(f -> { + if (f.isSuccess()) { + closeContext.complete(null); + } else { + Throwable cause = f.cause(); + if (cause instanceof Error) { + Netty4Utils.maybeDie(cause); + closeContext.completeExceptionally(new Exception(cause)); + } else { + closeContext.completeExceptionally((Exception) cause); + } + } + }); + } + + @Override + public InetSocketAddress getLocalAddress() { + return (InetSocketAddress) channel.localAddress(); + } + + @Override + public void addCloseListener(ActionListener listener) { + closeContext.addListener(ActionListener.toBiConsumer(listener)); + } + + @Override + public boolean isOpen() { + return channel.isOpen(); + } + + @Override + public void close() { + channel.close(); + } + + @Override + public String toString() { + return "Netty4HttpChannel{localAddress=" + getLocalAddress() + "}"; + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 8a49ce38b89..34f00c06840 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -23,6 +23,7 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -42,22 +43,19 @@ import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.util.AttributeKey; import org.elasticsearch.common.Strings; import org.elasticsearch.common.network.CloseableChannel; -import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.http.AbstractHttpServerTransport; -import org.elasticsearch.http.BindHttpException; import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.HttpHandlingSettings; -import org.elasticsearch.http.HttpStats; +import org.elasticsearch.http.HttpServerChannel; import org.elasticsearch.http.netty4.cors.Netty4CorsConfig; import org.elasticsearch.http.netty4.cors.Netty4CorsConfigBuilder; import org.elasticsearch.http.netty4.cors.Netty4CorsHandler; @@ -65,14 +63,9 @@ import org.elasticsearch.rest.RestUtils; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.netty4.Netty4Utils; -import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; @@ -154,12 +147,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { private final int pipeliningMaxEvents; - private final boolean tcpNoDelay; - private final boolean tcpKeepAlive; - private final boolean reuseAddress; - - private final ByteSizeValue tcpSendBufferSize; - private final ByteSizeValue tcpReceiveBufferSize; private final RecvByteBufAllocator recvByteBufAllocator; private final int readTimeoutMillis; @@ -167,8 +154,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { protected volatile ServerBootstrap serverBootstrap; - protected final List serverChannels = new ArrayList<>(); - private final Netty4CorsConfig corsConfig; public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool, @@ -184,11 +169,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings); this.workerCount = SETTING_HTTP_WORKER_COUNT.get(settings); - this.tcpNoDelay = SETTING_HTTP_TCP_NO_DELAY.get(settings); - this.tcpKeepAlive = SETTING_HTTP_TCP_KEEP_ALIVE.get(settings); - this.reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings); - this.tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings); - this.tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings); this.readTimeoutMillis = Math.toIntExact(SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis()); ByteSizeValue receivePredictor = SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE.get(settings); @@ -217,6 +197,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.childHandler(configureServerChannelHandler()); + serverBootstrap.handler(new ServerChannelExceptionHandler(this)); serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings)); serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings)); @@ -238,10 +219,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress); serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress); - this.boundAddress = createBoundHttpAddress(); - if (logger.isInfoEnabled()) { - logger.info("{}", boundAddress); - } + bindServer(); success = true; } finally { if (success == false) { @@ -284,78 +262,29 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { } @Override - protected TransportAddress bindAddress(final InetAddress hostAddress) { - final AtomicReference lastException = new AtomicReference<>(); - final AtomicReference boundSocket = new AtomicReference<>(); - boolean success = port.iterate(portNumber -> { - try { - synchronized (serverChannels) { - ChannelFuture future = serverBootstrap.bind(new InetSocketAddress(hostAddress, portNumber)).sync(); - serverChannels.add(future.channel()); - boundSocket.set((InetSocketAddress) future.channel().localAddress()); - } - } catch (Exception e) { - lastException.set(e); - return false; - } - return true; - }); - if (!success) { - throw new BindHttpException("Failed to bind to [" + port.getPortRangeString() + "]", lastException.get()); - } - - if (logger.isDebugEnabled()) { - logger.debug("Bound http to address {{}}", NetworkAddress.format(boundSocket.get())); - } - return new TransportAddress(boundSocket.get()); + protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Exception { + ChannelFuture future = serverBootstrap.bind(socketAddress).sync(); + Channel channel = future.channel(); + Netty4HttpServerChannel httpServerChannel = new Netty4HttpServerChannel(channel); + channel.attr(HTTP_SERVER_CHANNEL_KEY).set(httpServerChannel); + return httpServerChannel; } @Override - protected void doStop() { - synchronized (serverChannels) { - if (!serverChannels.isEmpty()) { - try { - Netty4Utils.closeChannels(serverChannels); - } catch (IOException e) { - logger.trace("exception while closing channels", e); - } finally { - serverChannels.clear(); - } - } - } - - // TODO: Move all of channel closing to abstract class once server channels are handled - try { - CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true); - } catch (Exception e) { - logger.warn("unexpected exception while closing http channels", e); - } - httpChannels.clear(); - - - + protected void stopInternal() { if (serverBootstrap != null) { serverBootstrap.config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS).awaitUninterruptibly(); serverBootstrap = null; } } - @Override - protected void doClose() { - } - - @Override - public HttpStats stats() { - return new HttpStats(httpChannels.size(), totalChannelsAccepted.get()); - } - @Override protected void onException(HttpChannel channel, Exception cause) { if (cause instanceof ReadTimeoutException) { if (logger.isTraceEnabled()) { logger.trace("Http read timeout {}", channel); } - CloseableChannel.closeChannel(channel);; + CloseableChannel.closeChannel(channel); } else { super.onException(channel, cause); } @@ -366,6 +295,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { } static final AttributeKey HTTP_CHANNEL_KEY = AttributeKey.newInstance("es-http-channel"); + static final AttributeKey HTTP_SERVER_CHANNEL_KEY = AttributeKey.newInstance("es-http-server-channel"); protected static class HttpChannelHandler extends ChannelInitializer { @@ -413,4 +343,24 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { } } + @ChannelHandler.Sharable + private static class ServerChannelExceptionHandler extends ChannelHandlerAdapter { + + private final Netty4HttpServerTransport transport; + + private ServerChannelExceptionHandler(Netty4HttpServerTransport transport) { + this.transport = transport; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + Netty4Utils.maybeDie(cause); + Netty4HttpServerChannel httpServerChannel = ctx.channel().attr(HTTP_SERVER_CHANNEL_KEY).get(); + if (cause instanceof Error) { + transport.onServerException(httpServerChannel, new Exception(cause)); + } else { + transport.onServerException(httpServerChannel, (Exception) cause); + } + } + } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java index 58440ae96e0..698c86d048c 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java @@ -24,6 +24,8 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.util.Attribute; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.transport.TcpHeader; import org.elasticsearch.transport.Transports; @@ -36,11 +38,9 @@ import org.elasticsearch.transport.Transports; final class Netty4MessageChannelHandler extends ChannelDuplexHandler { private final Netty4Transport transport; - private final String profileName; - Netty4MessageChannelHandler(Netty4Transport transport, String profileName) { + Netty4MessageChannelHandler(Netty4Transport transport) { this.transport = transport; - this.profileName = profileName; } @Override @@ -58,7 +58,7 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler { // netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh // buffer, or in the cumulative buffer, which is cleaned each time so it could be bigger than the actual size BytesReference reference = Netty4Utils.toBytesReference(buffer, remainingMessageSize); - Attribute channelAttribute = channel.attr(Netty4Transport.CHANNEL_KEY); + Attribute channelAttribute = channel.attr(Netty4Transport.CHANNEL_KEY); transport.messageReceived(reference, channelAttribute.get()); } finally { // Set the expected position of the buffer, no matter what happened @@ -69,7 +69,13 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Netty4Utils.maybeDie(cause); - transport.exceptionCaught(ctx, cause); + final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class); + final Throwable newCause = unwrapped != null ? unwrapped : cause; + Netty4TcpChannel tcpChannel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get(); + if (newCause instanceof Error) { + transport.onException(tcpChannel, new Exception(newCause)); + } else { + transport.onException(tcpChannel, (Exception) newCause); + } } - } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java similarity index 96% rename from modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java rename to modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java index 89fabdcd763..78a14255000 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java @@ -30,13 +30,13 @@ import org.elasticsearch.transport.TransportException; import java.net.InetSocketAddress; -public class NettyTcpChannel implements TcpChannel { +public class Netty4TcpChannel implements TcpChannel { private final Channel channel; private final String profile; private final CompletableContext closeContext = new CompletableContext<>(); - NettyTcpChannel(Channel channel, String profile) { + Netty4TcpChannel(Channel channel, String profile) { this.channel = channel; this.profile = profile; this.channel.closeFuture().addListener(f -> { @@ -118,7 +118,7 @@ public class NettyTcpChannel implements TcpChannel { @Override public String toString() { - return "NettyTcpChannel{" + + return "Netty4TcpChannel{" + "localAddress=" + getLocalAddress() + ", remoteAddress=" + channel.remoteAddress() + '}'; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java new file mode 100644 index 00000000000..873a6c33fba --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java @@ -0,0 +1,84 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.netty4; + +import io.netty.channel.Channel; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.concurrent.CompletableContext; +import org.elasticsearch.transport.TcpServerChannel; + +import java.net.InetSocketAddress; + +public class Netty4TcpServerChannel implements TcpServerChannel { + + private final Channel channel; + private final String profile; + private final CompletableContext closeContext = new CompletableContext<>(); + + Netty4TcpServerChannel(Channel channel, String profile) { + this.channel = channel; + this.profile = profile; + this.channel.closeFuture().addListener(f -> { + if (f.isSuccess()) { + closeContext.complete(null); + } else { + Throwable cause = f.cause(); + if (cause instanceof Error) { + Netty4Utils.maybeDie(cause); + closeContext.completeExceptionally(new Exception(cause)); + } else { + closeContext.completeExceptionally((Exception) cause); + } + } + }); + } + + @Override + public String getProfile() { + return profile; + } + + @Override + public InetSocketAddress getLocalAddress() { + return (InetSocketAddress) channel.localAddress(); + } + + @Override + public void close() { + channel.close(); + } + + @Override + public void addCloseListener(ActionListener listener) { + closeContext.addListener(ActionListener.toBiConsumer(listener)); + } + + @Override + public boolean isOpen() { + return channel.isOpen(); + } + + @Override + public String toString() { + return "Netty4TcpChannel{" + + "localAddress=" + getLocalAddress() + + '}'; + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 466c4b68bfa..c8c6fceb543 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -25,6 +25,7 @@ import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -37,8 +38,6 @@ import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.collect.Tuple; @@ -196,6 +195,7 @@ public class Netty4Transport extends TcpTransport { serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.childHandler(getServerChannelInitializer(name)); + serverBootstrap.handler(new ServerChannelExceptionHandler()); serverBootstrap.childOption(ChannelOption.TCP_NODELAY, profileSettings.tcpNoDelay); serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, profileSettings.tcpKeepAlive); @@ -226,17 +226,11 @@ public class Netty4Transport extends TcpTransport { return new ClientChannelInitializer(); } - static final AttributeKey CHANNEL_KEY = AttributeKey.newInstance("es-channel"); - - protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class); - final Throwable t = unwrapped != null ? unwrapped : cause; - Channel channel = ctx.channel(); - onException(channel.attr(CHANNEL_KEY).get(), t instanceof Exception ? (Exception) t : new ElasticsearchException(t)); - } + static final AttributeKey CHANNEL_KEY = AttributeKey.newInstance("es-channel"); + static final AttributeKey SERVER_CHANNEL_KEY = AttributeKey.newInstance("es-server-channel"); @Override - protected NettyTcpChannel initiateChannel(InetSocketAddress address, ActionListener listener) throws IOException { + protected Netty4TcpChannel initiateChannel(InetSocketAddress address, ActionListener listener) throws IOException { ChannelFuture channelFuture = bootstrap.connect(address); Channel channel = channelFuture.channel(); if (channel == null) { @@ -245,7 +239,7 @@ public class Netty4Transport extends TcpTransport { } addClosedExceptionLogger(channel); - NettyTcpChannel nettyChannel = new NettyTcpChannel(channel, "default"); + Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, "default"); channel.attr(CHANNEL_KEY).set(nettyChannel); channelFuture.addListener(f -> { @@ -266,10 +260,10 @@ public class Netty4Transport extends TcpTransport { } @Override - protected NettyTcpChannel bind(String name, InetSocketAddress address) { + protected Netty4TcpServerChannel bind(String name, InetSocketAddress address) { Channel channel = serverBootstraps.get(name).bind(address).syncUninterruptibly().channel(); - NettyTcpChannel esChannel = new NettyTcpChannel(channel, name); - channel.attr(CHANNEL_KEY).set(esChannel); + Netty4TcpServerChannel esChannel = new Netty4TcpServerChannel(channel, name); + channel.attr(SERVER_CHANNEL_KEY).set(esChannel); return esChannel; } @@ -310,7 +304,7 @@ public class Netty4Transport extends TcpTransport { ch.pipeline().addLast("logging", new ESLoggingHandler()); ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); // using a dot as a prefix means this cannot come from any settings parsed - ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client")); + ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this)); } @Override @@ -331,11 +325,11 @@ public class Netty4Transport extends TcpTransport { @Override protected void initChannel(Channel ch) throws Exception { addClosedExceptionLogger(ch); - NettyTcpChannel nettyTcpChannel = new NettyTcpChannel(ch, name); + Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, name); ch.attr(CHANNEL_KEY).set(nettyTcpChannel); ch.pipeline().addLast("logging", new ESLoggingHandler()); ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); - ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name)); + ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this)); serverAcceptedChannel(nettyTcpChannel); } @@ -353,4 +347,19 @@ public class Netty4Transport extends TcpTransport { } }); } + + @ChannelHandler.Sharable + private class ServerChannelExceptionHandler extends ChannelHandlerAdapter { + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + Netty4Utils.maybeDie(cause); + Netty4TcpServerChannel serverChannel = ctx.channel().attr(SERVER_CHANNEL_KEY).get(); + if (cause instanceof Error) { + onServerException(serverChannel, new Exception(cause)); + } else { + onServerException(serverChannel, (Exception) cause); + } + } + } } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java index 7343da6c3b1..4c783cf0787 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java @@ -70,7 +70,7 @@ public class Netty4SizeHeaderFrameDecoderTests extends ESTestCase { nettyTransport.start(); TransportAddress[] boundAddresses = nettyTransport.boundAddress().boundAddresses(); - TransportAddress transportAddress = (TransportAddress) randomFrom(boundAddresses); + TransportAddress transportAddress = randomFrom(boundAddresses); port = transportAddress.address().getPort(); host = transportAddress.address().getAddress(); } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerChannel.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerChannel.java new file mode 100644 index 00000000000..2674d38dc49 --- /dev/null +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerChannel.java @@ -0,0 +1,44 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.http.nio; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.http.HttpServerChannel; +import org.elasticsearch.nio.NioServerSocketChannel; + +import java.io.IOException; +import java.nio.channels.ServerSocketChannel; + +public class NioHttpServerChannel extends NioServerSocketChannel implements HttpServerChannel { + + NioHttpServerChannel(ServerSocketChannel serverSocketChannel) throws IOException { + super(serverSocketChannel); + } + + @Override + public void addCloseListener(ActionListener listener) { + addCloseListener(ActionListener.toBiConsumer(listener)); + } + + @Override + public String toString() { + return "NioHttpServerChannel{localAddress=" + getLocalAddress() + "}"; + } +} diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java index aa0859e6146..b80778e9642 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java @@ -21,40 +21,29 @@ package org.elasticsearch.http.nio; import io.netty.handler.codec.http.HttpMethod; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionFuture; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.network.CloseableChannel; -import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.http.AbstractHttpServerTransport; -import org.elasticsearch.http.BindHttpException; import org.elasticsearch.http.HttpChannel; +import org.elasticsearch.http.HttpServerChannel; import org.elasticsearch.http.HttpServerTransport; -import org.elasticsearch.http.HttpStats; import org.elasticsearch.http.nio.cors.NioCorsConfig; import org.elasticsearch.http.nio.cors.NioCorsConfigBuilder; import org.elasticsearch.nio.BytesChannelContext; import org.elasticsearch.nio.ChannelFactory; import org.elasticsearch.nio.EventHandler; import org.elasticsearch.nio.InboundChannelBuffer; -import org.elasticsearch.nio.NioChannel; import org.elasticsearch.nio.NioGroup; import org.elasticsearch.nio.NioSelector; -import org.elasticsearch.nio.NioServerSocketChannel; import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.nio.ServerChannelContext; import org.elasticsearch.nio.SocketChannelContext; @@ -62,18 +51,11 @@ import org.elasticsearch.rest.RestUtils; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.regex.Pattern; @@ -113,7 +95,6 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { private final int tcpSendBufferSize; private final int tcpReceiveBufferSize; - private final Set serverChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); private NioGroup nioGroup; private HttpChannelFactory channelFactory; private final NioCorsConfig corsConfig; @@ -156,12 +137,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { daemonThreadFactory(this.settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX), workerCount, (s) -> new EventHandler(this::onNonChannelException, s)); channelFactory = new HttpChannelFactory(); - this.boundAddress = createBoundHttpAddress(); - - if (logger.isInfoEnabled()) { - logger.info("{}", boundAddress); - } - + bindServer(); success = true; } catch (IOException e) { throw new ElasticsearchException(e); @@ -173,26 +149,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { } @Override - protected void doStop() { - synchronized (serverChannels) { - if (serverChannels.isEmpty() == false) { - try { - closeChannels(new ArrayList<>(serverChannels)); - } catch (Exception e) { - logger.error("unexpected exception while closing http server channels", e); - } - serverChannels.clear(); - } - } - - // TODO: Move all of channel closing to abstract class once server channels are handled - try { - CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true); - } catch (Exception e) { - logger.warn("unexpected exception while closing http channels", e); - } - httpChannels.clear(); - + protected void stopInternal() { try { nioGroup.close(); } catch (Exception e) { @@ -201,40 +158,8 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { } @Override - protected void doClose() throws IOException { - } - - @Override - protected TransportAddress bindAddress(InetAddress hostAddress) { - final AtomicReference lastException = new AtomicReference<>(); - final AtomicReference boundSocket = new AtomicReference<>(); - boolean success = port.iterate(portNumber -> { - try { - synchronized (serverChannels) { - InetSocketAddress address = new InetSocketAddress(hostAddress, portNumber); - NioServerSocketChannel channel = nioGroup.bindServerChannel(address, channelFactory); - serverChannels.add(channel); - boundSocket.set(channel.getLocalAddress()); - } - } catch (Exception e) { - lastException.set(e); - return false; - } - return true; - }); - if (success == false) { - throw new BindHttpException("Failed to bind to [" + port.getPortRangeString() + "]", lastException.get()); - } - - if (logger.isDebugEnabled()) { - logger.debug("Bound http to address {{}}", NetworkAddress.format(boundSocket.get())); - } - return new TransportAddress(boundSocket.get()); - } - - @Override - public HttpStats stats() { - return new HttpStats(serverChannels.size(), totalChannelsAccepted.get()); + protected HttpServerChannel bind(InetSocketAddress socketAddress) throws IOException { + return nioGroup.bindServerChannel(socketAddress, channelFactory); } static NioCorsConfig buildCorsConfig(Settings settings) { @@ -269,33 +194,11 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { .build(); } - private void closeChannels(List channels) { - List> futures = new ArrayList<>(channels.size()); - - for (NioChannel channel : channels) { - PlainActionFuture future = PlainActionFuture.newFuture(); - channel.addCloseListener(ActionListener.toBiConsumer(future)); - futures.add(future); - channel.close(); - } - - List closeExceptions = new ArrayList<>(); - for (ActionFuture f : futures) { - try { - f.actionGet(); - } catch (RuntimeException e) { - closeExceptions.add(e); - } - } - - ExceptionsHelper.rethrowAndSuppress(closeExceptions); - } - private void acceptChannel(NioSocketChannel socketChannel) { super.serverAcceptedChannel((HttpChannel) socketChannel); } - private class HttpChannelFactory extends ChannelFactory { + private class HttpChannelFactory extends ChannelFactory { private HttpChannelFactory() { super(new RawChannelFactory(tcpNoDelay, tcpKeepAlive, reuseAddress, tcpSendBufferSize, tcpReceiveBufferSize)); @@ -303,29 +206,28 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { @Override public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException { - NioHttpChannel nioChannel = new NioHttpChannel(channel); + NioHttpChannel httpChannel = new NioHttpChannel(channel); java.util.function.Supplier pageSupplier = () -> { Recycler.V bytes = pageCacheRecycler.bytePage(false); return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close); }; - HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(nioChannel,NioHttpServerTransport.this, + HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(httpChannel,NioHttpServerTransport.this, handlingSettings, corsConfig); - Consumer exceptionHandler = (e) -> onException(nioChannel, e); - SocketChannelContext context = new BytesChannelContext(nioChannel, selector, exceptionHandler, httpReadWritePipeline, + Consumer exceptionHandler = (e) -> onException(httpChannel, e); + SocketChannelContext context = new BytesChannelContext(httpChannel, selector, exceptionHandler, httpReadWritePipeline, new InboundChannelBuffer(pageSupplier)); - nioChannel.setContext(context); - return nioChannel; + httpChannel.setContext(context); + return httpChannel; } @Override - public NioServerSocketChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { - NioServerSocketChannel nioChannel = new NioServerSocketChannel(channel); - Consumer exceptionHandler = (e) -> logger.error(() -> - new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e); + public NioHttpServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { + NioHttpServerChannel httpServerChannel = new NioHttpServerChannel(channel); + Consumer exceptionHandler = (e) -> onServerException(httpServerChannel, e); Consumer acceptor = NioHttpServerTransport.this::acceptChannel; - ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, acceptor, exceptionHandler); - nioChannel.setContext(context); - return nioChannel; + ServerChannelContext context = new ServerChannelContext(httpServerChannel, this, selector, acceptor, exceptionHandler); + httpServerChannel.setContext(context); + return httpServerChannel; } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpServerChannel.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpServerChannel.java index 10bf4ed7523..3c6d4b12df9 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpServerChannel.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpServerChannel.java @@ -20,19 +20,17 @@ package org.elasticsearch.transport.nio; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.nio.NioServerSocketChannel; -import org.elasticsearch.transport.TcpChannel; +import org.elasticsearch.transport.TcpServerChannel; import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.channels.ServerSocketChannel; /** - * This is an implementation of {@link NioServerSocketChannel} that adheres to the {@link TcpChannel} + * This is an implementation of {@link NioServerSocketChannel} that adheres to the {@link TcpServerChannel} * interface. As it is a server socket, setting SO_LINGER and sending messages is not supported. */ -public class NioTcpServerChannel extends NioServerSocketChannel implements TcpChannel { +public class NioTcpServerChannel extends NioServerSocketChannel implements TcpServerChannel { private final String profile; @@ -41,21 +39,6 @@ public class NioTcpServerChannel extends NioServerSocketChannel implements TcpCh this.profile = profile; } - @Override - public void sendMessage(BytesReference reference, ActionListener listener) { - throw new UnsupportedOperationException("Cannot send a message to a server channel."); - } - - @Override - public void setSoLinger(int value) throws IOException { - throw new UnsupportedOperationException("Cannot set SO_LINGER on a server channel."); - } - - @Override - public InetSocketAddress getRemoteAddress() { - return null; - } - @Override public void close() { getContext().closeChannel(); diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index cf7d37493cb..47229a0df2f 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -19,7 +19,6 @@ package org.elasticsearch.transport.nio; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -176,8 +175,7 @@ public class NioTransport extends TcpTransport { @Override public NioTcpServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { NioTcpServerChannel nioChannel = new NioTcpServerChannel(profileName, channel); - Consumer exceptionHandler = (e) -> logger.error(() -> - new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e); + Consumer exceptionHandler = (e) -> onServerException(nioChannel, e); Consumer acceptor = NioTransport.this::acceptChannel; ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, acceptor, exceptionHandler); nioChannel.setContext(context); diff --git a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java index 9d9008f7fb8..622020d6451 100644 --- a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.network.CloseableChannel; +import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; @@ -53,6 +54,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_BIND_HOST; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH; @@ -74,9 +76,10 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo private final String[] bindHosts; private final String[] publishHosts; - protected final AtomicLong totalChannelsAccepted = new AtomicLong(); - protected final Set httpChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); - protected volatile BoundTransportAddress boundAddress; + private volatile BoundTransportAddress boundAddress; + private final AtomicLong totalChannelsAccepted = new AtomicLong(); + private final Set httpChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set httpServerChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); protected AbstractHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool, NamedXContentRegistry xContentRegistry, Dispatcher dispatcher) { @@ -116,7 +119,12 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo return new HttpInfo(boundTransportAddress, maxContentLength.getBytes()); } - protected BoundTransportAddress createBoundHttpAddress() { + @Override + public HttpStats stats() { + return new HttpStats(httpChannels.size(), totalChannelsAccepted.get()); + } + + protected void bindServer() { // Bind and start to accept incoming connections. InetAddress hostAddresses[]; try { @@ -138,11 +146,71 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo } final int publishPort = resolvePublishPort(settings, boundAddresses, publishInetAddress); - final InetSocketAddress publishAddress = new InetSocketAddress(publishInetAddress, publishPort); - return new BoundTransportAddress(boundAddresses.toArray(new TransportAddress[0]), new TransportAddress(publishAddress)); + TransportAddress publishAddress = new TransportAddress(new InetSocketAddress(publishInetAddress, publishPort)); + this.boundAddress = new BoundTransportAddress(boundAddresses.toArray(new TransportAddress[0]), publishAddress); + logger.info("{}", boundAddress); } - protected abstract TransportAddress bindAddress(InetAddress hostAddress); + private TransportAddress bindAddress(final InetAddress hostAddress) { + final AtomicReference lastException = new AtomicReference<>(); + final AtomicReference boundSocket = new AtomicReference<>(); + boolean success = port.iterate(portNumber -> { + try { + synchronized (httpServerChannels) { + HttpServerChannel httpServerChannel = bind(new InetSocketAddress(hostAddress, portNumber)); + httpServerChannels.add(httpServerChannel); + boundSocket.set(httpServerChannel.getLocalAddress()); + } + } catch (Exception e) { + lastException.set(e); + return false; + } + return true; + }); + if (!success) { + throw new BindHttpException("Failed to bind to [" + port.getPortRangeString() + "]", lastException.get()); + } + + if (logger.isDebugEnabled()) { + logger.debug("Bound http to address {{}}", NetworkAddress.format(boundSocket.get())); + } + return new TransportAddress(boundSocket.get()); + } + + protected abstract HttpServerChannel bind(InetSocketAddress hostAddress) throws Exception; + + @Override + protected void doStop() { + synchronized (httpServerChannels) { + if (httpServerChannels.isEmpty() == false) { + try { + CloseableChannel.closeChannels(new ArrayList<>(httpServerChannels), true); + } catch (Exception e) { + logger.warn("exception while closing channels", e); + } finally { + httpServerChannels.clear(); + } + } + } + + try { + CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true); + } catch (Exception e) { + logger.warn("unexpected exception while closing http channels", e); + } + httpChannels.clear(); + + stopInternal(); + } + + @Override + protected void doClose() { + } + + /** + * Called to tear down internal resources + */ + protected abstract void stopInternal(); // package private for tests static int resolvePublishPort(Settings settings, List boundAddresses, InetAddress publishInetAddress) { @@ -197,19 +265,23 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo CloseableChannel.closeChannel(channel); } else { logger.warn(() -> new ParameterizedMessage( - "caught exception while handling client http traffic, closing connection {}", channel), e); + "caught exception while handling client http traffic, closing connection {}", channel), e); CloseableChannel.closeChannel(channel); } } + protected void onServerException(HttpServerChannel channel, Exception e) { + logger.error(new ParameterizedMessage("exception from http server channel caught on transport layer [channel={}]", channel), e); + } + /** * Exception handler for exceptions that are not associated with a specific channel. * * @param exception the exception */ protected void onNonChannelException(Exception exception) { - logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()), - exception); + String threadName = Thread.currentThread().getName(); + logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", threadName), exception); } protected void serverAcceptedChannel(HttpChannel httpChannel) { diff --git a/server/src/main/java/org/elasticsearch/http/HttpServerChannel.java b/server/src/main/java/org/elasticsearch/http/HttpServerChannel.java new file mode 100644 index 00000000000..e4222ae8168 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/http/HttpServerChannel.java @@ -0,0 +1,34 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.http; + +import org.elasticsearch.common.network.CloseableChannel; + +import java.net.InetSocketAddress; + +public interface HttpServerChannel extends CloseableChannel { + + /** + * Returns the local address for this channel. + * + * @return the local address of this channel. + */ + InetSocketAddress getLocalAddress(); +} diff --git a/server/src/main/java/org/elasticsearch/transport/TcpServerChannel.java b/server/src/main/java/org/elasticsearch/transport/TcpServerChannel.java new file mode 100644 index 00000000000..408ec1af20b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/TcpServerChannel.java @@ -0,0 +1,46 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.common.network.CloseableChannel; + +import java.net.InetSocketAddress; + + +/** + * This is a tcp channel representing a server channel listening for new connections. It is the server + * channel abstraction used by the {@link TcpTransport} and {@link TransportService}. All tcp transport + * implementations must return server channels that adhere to the required method contracts. + */ +public interface TcpServerChannel extends CloseableChannel { + + /** + * This returns the profile for this channel. + */ + String getProfile(); + + /** + * Returns the local address for this channel. + * + * @return the local address of this channel. + */ + InetSocketAddress getLocalAddress(); + +} diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index bd862c19e9c..c8f256c2db8 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -21,9 +21,6 @@ package org.elasticsearch.transport; import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.hppc.IntSet; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.common.Booleans; -import org.elasticsearch.common.network.CloseableChannel; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionFuture; @@ -31,6 +28,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NotifyOnceListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Booleans; import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; @@ -52,6 +50,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; +import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; @@ -68,6 +67,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.rest.RestStatus; @@ -210,7 +210,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private final ConcurrentMap profileBoundAddresses = newConcurrentMap(); // node id to actual channel private final ConcurrentMap connectedNodes = newConcurrentMap(); - private final Map> serverChannels = newConcurrentMap(); + private final Map> serverChannels = newConcurrentMap(); private final Set acceptedChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final KeyedLock connectionLock = new KeyedLock<>(); @@ -792,9 +792,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements final AtomicReference boundSocket = new AtomicReference<>(); boolean success = portsRange.iterate(portNumber -> { try { - TcpChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber)); + TcpServerChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber)); synchronized (serverChannels) { - List list = serverChannels.get(name); + List list = serverChannels.get(name); if (list == null) { list = new ArrayList<>(); serverChannels.put(name, list); @@ -957,9 +957,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements closeLock.writeLock().lock(); try { // first stop to accept any incoming connections so nobody can connect to this transport - for (Map.Entry> entry : serverChannels.entrySet()) { + for (Map.Entry> entry : serverChannels.entrySet()) { String profile = entry.getKey(); - List channels = entry.getValue(); + List channels = entry.getValue(); ActionListener closeFailLogger = ActionListener.wrap(c -> {}, e -> logger.warn(() -> new ParameterizedMessage("Error closing serverChannel for profile [{}]", profile), e)); channels.forEach(c -> c.addCloseListener(closeFailLogger)); @@ -999,7 +999,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements } } - protected void onException(TcpChannel channel, Exception e) { + public void onException(TcpChannel channel, Exception e) { if (!lifecycle.started()) { // just close and ignore - we are already stopped and just need to make sure we release all resources CloseableChannel.closeChannel(channel); @@ -1049,6 +1049,10 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements } } + protected void onServerException(TcpServerChannel channel, Exception e) { + logger.error(new ParameterizedMessage("exception from server channel caught on transport layer [channel={}]", channel), e); + } + /** * Exception handler for exceptions that are not associated with a specific channel. * @@ -1072,7 +1076,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements * @param name the profile name * @param address the address to bind to */ - protected abstract TcpChannel bind(String name, InetSocketAddress address) throws IOException; + protected abstract TcpServerChannel bind(String name, InetSocketAddress address) throws IOException; /** * Initiate a single tcp socket channel. @@ -1087,8 +1091,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements /** * Called to tear down internal resources */ - protected void stopInternal() { - } + protected abstract void stopInternal(); public boolean canCompress(TransportRequest request) { return compress && (!(request instanceof BytesTransportRequest)); diff --git a/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java b/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java index a7629e5f48b..ece9fd503c1 100644 --- a/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java +++ b/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java @@ -35,8 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; -import java.io.IOException; -import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; @@ -128,8 +127,9 @@ public class AbstractHttpServerTransportTests extends ESTestCase { try (AbstractHttpServerTransport transport = new AbstractHttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher) { + @Override - protected TransportAddress bindAddress(InetAddress hostAddress) { + protected HttpServerChannel bind(InetSocketAddress hostAddress) { return null; } @@ -139,12 +139,7 @@ public class AbstractHttpServerTransportTests extends ESTestCase { } @Override - protected void doStop() { - - } - - @Override - protected void doClose() throws IOException { + protected void stopInternal() { } diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 2328aa46363..d16300bf266 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -193,6 +193,10 @@ public class TcpTransportTests extends ESTestCase { return new FakeChannel(messageCaptor); } + @Override + protected void stopInternal() { + } + @Override public NodeChannels getConnection(DiscoveryNode node) { int numConnections = MockTcpTransport.LIGHT_PROFILE.getNumConnections(); @@ -237,7 +241,7 @@ public class TcpTransportTests extends ESTestCase { } } - private static final class FakeChannel implements TcpChannel { + private static final class FakeChannel implements TcpChannel, TcpServerChannel { private final AtomicReference messageCaptor; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 8831c46c011..bbff340c860 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -225,7 +225,7 @@ public class MockTcpTransport extends TcpTransport { socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings)); } - public final class MockChannel implements Closeable, TcpChannel { + public final class MockChannel implements Closeable, TcpChannel, TcpServerChannel { private final AtomicBoolean isOpen = new AtomicBoolean(true); private final InetSocketAddress localAddress; private final ServerSocket serverSocket; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index cb9e243660a..2ab8719c334 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -41,6 +41,7 @@ import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.nio.ServerChannelContext; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TcpChannel; +import org.elasticsearch.transport.TcpServerChannel; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.Transports; @@ -191,7 +192,7 @@ public class MockNioTransport extends TcpTransport { } } - private static class MockServerChannel extends NioServerSocketChannel implements TcpChannel { + private static class MockServerChannel extends NioServerSocketChannel implements TcpServerChannel { private final String profile; @@ -215,21 +216,6 @@ public class MockNioTransport extends TcpTransport { public void addCloseListener(ActionListener listener) { addCloseListener(ActionListener.toBiConsumer(listener)); } - - @Override - public void setSoLinger(int value) throws IOException { - throw new UnsupportedOperationException("Cannot set SO_LINGER on a server channel."); - } - - @Override - public InetSocketAddress getRemoteAddress() { - return null; - } - - @Override - public void sendMessage(BytesReference reference, ActionListener listener) { - throw new UnsupportedOperationException("Cannot send a message to a server channel."); - } } private static class MockSocketChannel extends NioSocketChannel implements TcpChannel { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java index ce06712722c..b761439b15b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java @@ -109,7 +109,7 @@ public class SecurityNetty4Transport extends Netty4Transport { } @Override - protected void onException(TcpChannel channel, Exception e) { + public void onException(TcpChannel channel, Exception e) { if (!lifecycle.started()) { // just close and ignore - we are already stopped and just need to make sure we release all resources CloseableChannel.closeChannel(channel); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java index 161ac3678ae..9427812ba13 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java @@ -24,7 +24,7 @@ import org.elasticsearch.transport.TcpTransportChannel; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.transport.netty4.NettyTcpChannel; +import org.elasticsearch.transport.netty4.Netty4TcpChannel; import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.core.security.authc.Authentication; import org.elasticsearch.xpack.core.security.user.KibanaUser; @@ -116,8 +116,8 @@ public interface ServerTransportFilter { } if (extractClientCert && (unwrappedChannel instanceof TcpTransportChannel) && - ((TcpTransportChannel) unwrappedChannel).getChannel() instanceof NettyTcpChannel) { - Channel channel = ((NettyTcpChannel) ((TcpTransportChannel) unwrappedChannel).getChannel()).getLowLevelChannel(); + ((TcpTransportChannel) unwrappedChannel).getChannel() instanceof Netty4TcpChannel) { + Channel channel = ((Netty4TcpChannel) ((TcpTransportChannel) unwrappedChannel).getChannel()).getLowLevelChannel(); SslHandler sslHandler = channel.pipeline().get(SslHandler.class); if (channel.isOpen()) { assert sslHandler != null : "channel [" + channel + "] did not have a ssl handler. pipeline " + channel.pipeline(); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java index 5315a944f77..fd1b1198607 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.security.transport.nio; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.recycler.Recycler; @@ -131,9 +130,8 @@ public class SecurityNioTransport extends NioTransport { @Override public NioTcpServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { - NioTcpServerChannel nioChannel = new NioTcpServerChannel(profileName, channel); - Consumer exceptionHandler = (e) -> logger.error(() -> - new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e); + NioTcpServerChannel nioChannel = new NioTcpServerChannel(profileName, channel);; + Consumer exceptionHandler = (e) -> onServerException(nioChannel, e); Consumer acceptor = SecurityNioTransport.this::acceptChannel; ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, acceptor, exceptionHandler); nioChannel.setContext(context);