From 849847078142a394e3386f6bd5cf929affd5a52a Mon Sep 17 00:00:00 2001 From: jaymode Date: Tue, 2 Aug 2016 13:45:41 -0400 Subject: [PATCH 1/5] update transport to allow for extensions --- .../transport/netty4/Netty4Transport.java | 64 +++++++++++++------ 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index c1b2ef10211..9dc3291693c 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -199,16 +199,7 @@ public class Netty4Transport extends TcpTransport { bootstrap.channel(NioSocketChannel.class); } - bootstrap.handler(new ChannelInitializer() { - - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); - // using a dot as a prefix means this cannot come from any settings parsed - ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client")); - } - - }); + bootstrap.handler(getClientChannelInitializer()); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.millis())); bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings)); @@ -292,14 +283,7 @@ public class Netty4Transport extends TcpTransport { serverBootstrap.channel(NioServerSocketChannel.class); } - serverBootstrap.childHandler(new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast("open_channels", Netty4Transport.this.serverOpenChannels); - ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); - ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name)); - } - }); + serverBootstrap.childHandler(getServerChannelInitializer(name, settings)); serverBootstrap.childOption(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings)); serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(settings)); @@ -326,6 +310,14 @@ public class Netty4Transport extends TcpTransport { serverBootstraps.put(name, serverBootstrap); } + protected ChannelInitializer getServerChannelInitializer(String name, Settings settings) { + return new ServerChannelInitializer(name, settings); + } + + protected ChannelInitializer getClientChannelInitializer() { + return new ClientChannelInitializer(); + } + protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class); final Throwable t = unwrapped != null ? unwrapped : cause; @@ -348,7 +340,9 @@ public class Netty4Transport extends TcpTransport { Channel[] channels = new Channel[1]; channels[0] = connect.channel(); channels[0].closeFuture().addListener(new ChannelCloseListener(node)); - return new NodeChannels(channels, channels, channels, channels, channels); + NodeChannels nodeChannels = new NodeChannels(channels, channels, channels, channels, channels); + onAfterChannelsConnected(nodeChannels); + return nodeChannels; } protected NodeChannels connectToChannels(DiscoveryNode node) { @@ -409,6 +403,7 @@ public class Netty4Transport extends TcpTransport { } throw e; } + onAfterChannelsConnected(nodeChannels); success = true; } finally { if (success == false) { @@ -422,6 +417,9 @@ public class Netty4Transport extends TcpTransport { return nodeChannels; } + protected void onAfterChannelsConnected(NodeChannels nodeChannels) { + + } private class ChannelCloseListener implements ChannelFutureListener { private final DiscoveryNode node; @@ -503,4 +501,32 @@ public class Netty4Transport extends TcpTransport { }); } + protected class ClientChannelInitializer extends ChannelInitializer { + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); + // using a dot as a prefix means this cannot come from any settings parsed + ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client")); + } + + } + + protected class ServerChannelInitializer extends ChannelInitializer { + + protected final String name; + protected final Settings settings; + + protected ServerChannelInitializer(String name, Settings settings) { + this.name = name; + this.settings = settings; + } + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast("open_channels", Netty4Transport.this.serverOpenChannels); + ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); + ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name)); + } + } } From 00ca6c417ed171fe22756fa5e39bf6da05bfe170 Mon Sep 17 00:00:00 2001 From: jaymode Date: Tue, 2 Aug 2016 14:06:29 -0400 Subject: [PATCH 2/5] add javadocs --- .../elasticsearch/transport/netty4/Netty4Transport.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 9dc3291693c..d6ddc545783 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -417,9 +417,14 @@ public class Netty4Transport extends TcpTransport { return nodeChannels; } + /** + * Allows for logic to be executed after a connection has been made on all channels. While this method is being executed, the node is + * not listed as being connected to. + * @param nodeChannels the {@link NodeChannels} that have been connected + */ protected void onAfterChannelsConnected(NodeChannels nodeChannels) { - } + private class ChannelCloseListener implements ChannelFutureListener { private final DiscoveryNode node; From 669daccfbb8f567ed77a69d4f3f1a348b8f7254d Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 2 Aug 2016 14:09:05 -0400 Subject: [PATCH 3/5] Allow for Netty 4 HTTP extensions This commit enables the Netty 4 HTTP server implementation to allow for extensions. --- .../elasticsearch/http/netty4/Netty4HttpServerTransport.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 0d4a6ab5ee1..f95d9534f7a 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -525,12 +525,12 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem return new HttpChannelHandler(this, detailedErrorsEnabled, threadPool.getThreadContext()); } - static class HttpChannelHandler extends ChannelInitializer { + protected static class HttpChannelHandler extends ChannelInitializer { private final Netty4HttpServerTransport transport; private final Netty4HttpRequestHandler requestHandler; - HttpChannelHandler( + protected HttpChannelHandler( final Netty4HttpServerTransport transport, final boolean detailedErrorsEnabled, final ThreadContext threadContext) { From 6def10c5d9ee0747d42ee777e7ae27968fd69675 Mon Sep 17 00:00:00 2001 From: jaymode Date: Tue, 2 Aug 2016 14:46:44 -0400 Subject: [PATCH 4/5] make netty4 http request public --- .../java/org/elasticsearch/http/netty4/Netty4HttpRequest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java index 7825e3ebe1c..2e511d15622 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java @@ -34,7 +34,7 @@ import java.net.SocketAddress; import java.util.HashMap; import java.util.Map; -class Netty4HttpRequest extends RestRequest { +public class Netty4HttpRequest extends RestRequest { private final FullHttpRequest request; private final Channel channel; From 0461e12663b73c17792a9942460dd65c5d3d45d0 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 2 Aug 2016 16:43:28 -0400 Subject: [PATCH 5/5] Simplify Netty 4 transport implementations The Netty 4 transport implementations have an unnecessary dependency on SocketChannels, and can instead just use plain Channels. --- .../http/netty4/Netty4HttpServerTransport.java | 4 ++-- .../transport/netty4/Netty4Transport.java | 14 ++++++++------ .../netty4/Netty4HttpServerPipeliningTests.java | 3 ++- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index f95d9534f7a..7472d87209e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -525,7 +525,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem return new HttpChannelHandler(this, detailedErrorsEnabled, threadPool.getThreadContext()); } - protected static class HttpChannelHandler extends ChannelInitializer { + protected static class HttpChannelHandler extends ChannelInitializer { private final Netty4HttpServerTransport transport; private final Netty4HttpRequestHandler requestHandler; @@ -539,7 +539,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem } @Override - protected void initChannel(SocketChannel ch) throws Exception { + protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast("openChannels", transport.serverOpenChannels); final HttpRequestDecoder decoder = new HttpRequestDecoder( Math.toIntExact(transport.maxInitialLineLength.bytes()), diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index d6ddc545783..d7631acd6b7 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -25,6 +25,7 @@ import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -310,11 +311,11 @@ public class Netty4Transport extends TcpTransport { serverBootstraps.put(name, serverBootstrap); } - protected ChannelInitializer getServerChannelInitializer(String name, Settings settings) { + protected ChannelHandler getServerChannelInitializer(String name, Settings settings) { return new ServerChannelInitializer(name, settings); } - protected ChannelInitializer getClientChannelInitializer() { + protected ChannelHandler getClientChannelInitializer() { return new ClientChannelInitializer(); } @@ -506,10 +507,10 @@ public class Netty4Transport extends TcpTransport { }); } - protected class ClientChannelInitializer extends ChannelInitializer { + protected class ClientChannelInitializer extends ChannelInitializer { @Override - protected void initChannel(SocketChannel ch) throws Exception { + protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); // using a dot as a prefix means this cannot come from any settings parsed ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client")); @@ -517,7 +518,7 @@ public class Netty4Transport extends TcpTransport { } - protected class ServerChannelInitializer extends ChannelInitializer { + protected class ServerChannelInitializer extends ChannelInitializer { protected final String name; protected final Settings settings; @@ -528,10 +529,11 @@ public class Netty4Transport extends TcpTransport { } @Override - protected void initChannel(SocketChannel ch) throws Exception { + protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast("open_channels", Netty4Transport.this.serverOpenChannels); ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name)); } } + } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java index 4d94dc2ccaf..155bbe4bb5b 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.http.netty4; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; @@ -179,7 +180,7 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase { } @Override - protected void initChannel(SocketChannel ch) throws Exception { + protected void initChannel(Channel ch) throws Exception { super.initChannel(ch); ch.pipeline().replace("handler", "handler", new PossiblySlowUpstreamHandler(executorService)); }