From 0461e12663b73c17792a9942460dd65c5d3d45d0 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 2 Aug 2016 16:43:28 -0400 Subject: [PATCH] Simplify Netty 4 transport implementations The Netty 4 transport implementations have an unnecessary dependency on SocketChannels, and can instead just use plain Channels. --- .../http/netty4/Netty4HttpServerTransport.java | 4 ++-- .../transport/netty4/Netty4Transport.java | 14 ++++++++------ .../netty4/Netty4HttpServerPipeliningTests.java | 3 ++- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index f95d9534f7a..7472d87209e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -525,7 +525,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem return new HttpChannelHandler(this, detailedErrorsEnabled, threadPool.getThreadContext()); } - protected static class HttpChannelHandler extends ChannelInitializer { + protected static class HttpChannelHandler extends ChannelInitializer { private final Netty4HttpServerTransport transport; private final Netty4HttpRequestHandler requestHandler; @@ -539,7 +539,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem } @Override - protected void initChannel(SocketChannel ch) throws Exception { + protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast("openChannels", transport.serverOpenChannels); final HttpRequestDecoder decoder = new HttpRequestDecoder( Math.toIntExact(transport.maxInitialLineLength.bytes()), diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index d6ddc545783..d7631acd6b7 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -25,6 +25,7 @@ import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -310,11 +311,11 @@ public class Netty4Transport extends TcpTransport { serverBootstraps.put(name, serverBootstrap); } - protected ChannelInitializer getServerChannelInitializer(String name, Settings settings) { + protected ChannelHandler getServerChannelInitializer(String name, Settings settings) { return new ServerChannelInitializer(name, settings); } - protected ChannelInitializer getClientChannelInitializer() { + protected ChannelHandler getClientChannelInitializer() { return new ClientChannelInitializer(); } @@ -506,10 +507,10 @@ public class Netty4Transport extends TcpTransport { }); } - protected class ClientChannelInitializer extends ChannelInitializer { + protected class ClientChannelInitializer extends ChannelInitializer { @Override - protected void initChannel(SocketChannel ch) throws Exception { + protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); // using a dot as a prefix means this cannot come from any settings parsed ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client")); @@ -517,7 +518,7 @@ public class Netty4Transport extends TcpTransport { } - protected class ServerChannelInitializer extends ChannelInitializer { + protected class ServerChannelInitializer extends ChannelInitializer { protected final String name; protected final Settings settings; @@ -528,10 +529,11 @@ public class Netty4Transport extends TcpTransport { } @Override - protected void initChannel(SocketChannel ch) throws Exception { + protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast("open_channels", Netty4Transport.this.serverOpenChannels); ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name)); } } + } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java index 4d94dc2ccaf..155bbe4bb5b 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.http.netty4; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; @@ -179,7 +180,7 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase { } @Override - protected void initChannel(SocketChannel ch) throws Exception { + protected void initChannel(Channel ch) throws Exception { super.initChannel(ch); ch.pipeline().replace("handler", "handler", new PossiblySlowUpstreamHandler(executorService)); }