Simplify Netty 4 transport implementations
The Netty 4 transport implementations have an unnecessary dependency on SocketChannels, and can instead just use plain Channels.
This commit is contained in:
parent
6def10c5d9
commit
0461e12663
|
@ -525,7 +525,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
|
|||
return new HttpChannelHandler(this, detailedErrorsEnabled, threadPool.getThreadContext());
|
||||
}
|
||||
|
||||
protected static class HttpChannelHandler extends ChannelInitializer<SocketChannel> {
|
||||
protected static class HttpChannelHandler extends ChannelInitializer<Channel> {
|
||||
|
||||
private final Netty4HttpServerTransport transport;
|
||||
private final Netty4HttpRequestHandler requestHandler;
|
||||
|
@ -539,7 +539,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void initChannel(SocketChannel ch) throws Exception {
|
||||
protected void initChannel(Channel ch) throws Exception {
|
||||
ch.pipeline().addLast("openChannels", transport.serverOpenChannels);
|
||||
final HttpRequestDecoder decoder = new HttpRequestDecoder(
|
||||
Math.toIntExact(transport.maxInitialLineLength.bytes()),
|
||||
|
|
|
@ -25,6 +25,7 @@ import io.netty.channel.AdaptiveRecvByteBufAllocator;
|
|||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelOption;
|
||||
|
@ -310,11 +311,11 @@ public class Netty4Transport extends TcpTransport<Channel> {
|
|||
serverBootstraps.put(name, serverBootstrap);
|
||||
}
|
||||
|
||||
protected ChannelInitializer<SocketChannel> getServerChannelInitializer(String name, Settings settings) {
|
||||
protected ChannelHandler getServerChannelInitializer(String name, Settings settings) {
|
||||
return new ServerChannelInitializer(name, settings);
|
||||
}
|
||||
|
||||
protected ChannelInitializer<SocketChannel> getClientChannelInitializer() {
|
||||
protected ChannelHandler getClientChannelInitializer() {
|
||||
return new ClientChannelInitializer();
|
||||
}
|
||||
|
||||
|
@ -506,10 +507,10 @@ public class Netty4Transport extends TcpTransport<Channel> {
|
|||
});
|
||||
}
|
||||
|
||||
protected class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
|
||||
protected class ClientChannelInitializer extends ChannelInitializer<Channel> {
|
||||
|
||||
@Override
|
||||
protected void initChannel(SocketChannel ch) throws Exception {
|
||||
protected void initChannel(Channel ch) throws Exception {
|
||||
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
|
||||
// using a dot as a prefix means this cannot come from any settings parsed
|
||||
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client"));
|
||||
|
@ -517,7 +518,7 @@ public class Netty4Transport extends TcpTransport<Channel> {
|
|||
|
||||
}
|
||||
|
||||
protected class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
|
||||
protected class ServerChannelInitializer extends ChannelInitializer<Channel> {
|
||||
|
||||
protected final String name;
|
||||
protected final Settings settings;
|
||||
|
@ -528,10 +529,11 @@ public class Netty4Transport extends TcpTransport<Channel> {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void initChannel(SocketChannel ch) throws Exception {
|
||||
protected void initChannel(Channel ch) throws Exception {
|
||||
ch.pipeline().addLast("open_channels", Netty4Transport.this.serverOpenChannels);
|
||||
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
|
||||
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.http.netty4;
|
|||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
|
@ -179,7 +180,7 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void initChannel(SocketChannel ch) throws Exception {
|
||||
protected void initChannel(Channel ch) throws Exception {
|
||||
super.initChannel(ch);
|
||||
ch.pipeline().replace("handler", "handler", new PossiblySlowUpstreamHandler(executorService));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue