diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index c1b2ef10211..9dc3291693c 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -199,16 +199,7 @@ public class Netty4Transport extends TcpTransport { bootstrap.channel(NioSocketChannel.class); } - bootstrap.handler(new ChannelInitializer() { - - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); - // using a dot as a prefix means this cannot come from any settings parsed - ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client")); - } - - }); + bootstrap.handler(getClientChannelInitializer()); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.millis())); bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings)); @@ -292,14 +283,7 @@ public class Netty4Transport extends TcpTransport { serverBootstrap.channel(NioServerSocketChannel.class); } - serverBootstrap.childHandler(new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast("open_channels", Netty4Transport.this.serverOpenChannels); - ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); - ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name)); - } - }); + serverBootstrap.childHandler(getServerChannelInitializer(name, settings)); serverBootstrap.childOption(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings)); serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(settings)); @@ -326,6 +310,14 @@ public class Netty4Transport extends TcpTransport { serverBootstraps.put(name, serverBootstrap); } + protected ChannelInitializer getServerChannelInitializer(String name, Settings settings) { + return new ServerChannelInitializer(name, settings); + } + + protected ChannelInitializer getClientChannelInitializer() { + return new ClientChannelInitializer(); + } + protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class); final Throwable t = unwrapped != null ? unwrapped : cause; @@ -348,7 +340,9 @@ public class Netty4Transport extends TcpTransport { Channel[] channels = new Channel[1]; channels[0] = connect.channel(); channels[0].closeFuture().addListener(new ChannelCloseListener(node)); - return new NodeChannels(channels, channels, channels, channels, channels); + NodeChannels nodeChannels = new NodeChannels(channels, channels, channels, channels, channels); + onAfterChannelsConnected(nodeChannels); + return nodeChannels; } protected NodeChannels connectToChannels(DiscoveryNode node) { @@ -409,6 +403,7 @@ public class Netty4Transport extends TcpTransport { } throw e; } + onAfterChannelsConnected(nodeChannels); success = true; } finally { if (success == false) { @@ -422,6 +417,9 @@ public class Netty4Transport extends TcpTransport { return nodeChannels; } + protected void onAfterChannelsConnected(NodeChannels nodeChannels) { + + } private class ChannelCloseListener implements ChannelFutureListener { private final DiscoveryNode node; @@ -503,4 +501,32 @@ public class Netty4Transport extends TcpTransport { }); } + protected class ClientChannelInitializer extends ChannelInitializer { + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); + // using a dot as a prefix means this cannot come from any settings parsed + ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client")); + } + + } + + protected class ServerChannelInitializer extends ChannelInitializer { + + protected final String name; + protected final Settings settings; + + protected ServerChannelInitializer(String name, Settings settings) { + this.name = name; + this.settings = settings; + } + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast("open_channels", Netty4Transport.this.serverOpenChannels); + ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); + ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name)); + } + } }