Merge pull request #19767 from jaymode/netty4

Enable Netty 4 extensions
This commit is contained in:
Jason Tedor 2016-08-03 14:10:43 -04:00 committed by GitHub
commit b12608a6db
4 changed files with 58 additions and 24 deletions

View File

@ -34,7 +34,7 @@ import java.net.SocketAddress;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
class Netty4HttpRequest extends RestRequest { public class Netty4HttpRequest extends RestRequest {
private final FullHttpRequest request; private final FullHttpRequest request;
private final Channel channel; private final Channel channel;

View File

@ -525,12 +525,12 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
return new HttpChannelHandler(this, detailedErrorsEnabled, threadPool.getThreadContext()); return new HttpChannelHandler(this, detailedErrorsEnabled, threadPool.getThreadContext());
} }
static class HttpChannelHandler extends ChannelInitializer<SocketChannel> { protected static class HttpChannelHandler extends ChannelInitializer<Channel> {
private final Netty4HttpServerTransport transport; private final Netty4HttpServerTransport transport;
private final Netty4HttpRequestHandler requestHandler; private final Netty4HttpRequestHandler requestHandler;
HttpChannelHandler( protected HttpChannelHandler(
final Netty4HttpServerTransport transport, final Netty4HttpServerTransport transport,
final boolean detailedErrorsEnabled, final boolean detailedErrorsEnabled,
final ThreadContext threadContext) { final ThreadContext threadContext) {
@ -539,7 +539,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
} }
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast("openChannels", transport.serverOpenChannels); ch.pipeline().addLast("openChannels", transport.serverOpenChannels);
final HttpRequestDecoder decoder = new HttpRequestDecoder( final HttpRequestDecoder decoder = new HttpRequestDecoder(
Math.toIntExact(transport.maxInitialLineLength.bytes()), Math.toIntExact(transport.maxInitialLineLength.bytes()),

View File

@ -25,6 +25,7 @@ import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
@ -199,16 +200,7 @@ public class Netty4Transport extends TcpTransport<Channel> {
bootstrap.channel(NioSocketChannel.class); bootstrap.channel(NioSocketChannel.class);
} }
bootstrap.handler(new ChannelInitializer<SocketChannel>() { bootstrap.handler(getClientChannelInitializer());
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
// using a dot as a prefix means this cannot come from any settings parsed
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client"));
}
});
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.millis())); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.millis()));
bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings)); bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings));
@ -292,14 +284,7 @@ public class Netty4Transport extends TcpTransport<Channel> {
serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.channel(NioServerSocketChannel.class);
} }
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { serverBootstrap.childHandler(getServerChannelInitializer(name, settings));
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("open_channels", Netty4Transport.this.serverOpenChannels);
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name));
}
});
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings)); serverBootstrap.childOption(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings));
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(settings)); serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(settings));
@ -326,6 +311,14 @@ public class Netty4Transport extends TcpTransport<Channel> {
serverBootstraps.put(name, serverBootstrap); serverBootstraps.put(name, serverBootstrap);
} }
protected ChannelHandler getServerChannelInitializer(String name, Settings settings) {
return new ServerChannelInitializer(name, settings);
}
protected ChannelHandler getClientChannelInitializer() {
return new ClientChannelInitializer();
}
protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class); final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class);
final Throwable t = unwrapped != null ? unwrapped : cause; final Throwable t = unwrapped != null ? unwrapped : cause;
@ -348,7 +341,9 @@ public class Netty4Transport extends TcpTransport<Channel> {
Channel[] channels = new Channel[1]; Channel[] channels = new Channel[1];
channels[0] = connect.channel(); channels[0] = connect.channel();
channels[0].closeFuture().addListener(new ChannelCloseListener(node)); channels[0].closeFuture().addListener(new ChannelCloseListener(node));
return new NodeChannels(channels, channels, channels, channels, channels); NodeChannels nodeChannels = new NodeChannels(channels, channels, channels, channels, channels);
onAfterChannelsConnected(nodeChannels);
return nodeChannels;
} }
protected NodeChannels connectToChannels(DiscoveryNode node) { protected NodeChannels connectToChannels(DiscoveryNode node) {
@ -409,6 +404,7 @@ public class Netty4Transport extends TcpTransport<Channel> {
} }
throw e; throw e;
} }
onAfterChannelsConnected(nodeChannels);
success = true; success = true;
} finally { } finally {
if (success == false) { if (success == false) {
@ -422,6 +418,14 @@ public class Netty4Transport extends TcpTransport<Channel> {
return nodeChannels; return nodeChannels;
} }
/**
* Allows for logic to be executed after a connection has been made on all channels. While this method is being executed, the node is
* not listed as being connected to.
* @param nodeChannels the {@link NodeChannels} that have been connected
*/
protected void onAfterChannelsConnected(NodeChannels nodeChannels) {
}
private class ChannelCloseListener implements ChannelFutureListener { private class ChannelCloseListener implements ChannelFutureListener {
private final DiscoveryNode node; private final DiscoveryNode node;
@ -503,4 +507,33 @@ public class Netty4Transport extends TcpTransport<Channel> {
}); });
} }
protected class ClientChannelInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
// using a dot as a prefix means this cannot come from any settings parsed
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client"));
}
}
protected class ServerChannelInitializer extends ChannelInitializer<Channel> {
protected final String name;
protected final Settings settings;
protected ServerChannelInitializer(String name, Settings settings) {
this.name = name;
this.settings = settings;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast("open_channels", Netty4Transport.this.serverOpenChannels);
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name));
}
}
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.http.netty4;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
@ -179,7 +180,7 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase {
} }
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(Channel ch) throws Exception {
super.initChannel(ch); super.initChannel(ch);
ch.pipeline().replace("handler", "handler", new PossiblySlowUpstreamHandler(executorService)); ch.pipeline().replace("handler", "handler", new PossiblySlowUpstreamHandler(executorService));
} }