update transport to allow for extensions
This commit is contained in:
parent
227463c356
commit
8498470781
|
@ -199,16 +199,7 @@ public class Netty4Transport extends TcpTransport<Channel> {
|
||||||
bootstrap.channel(NioSocketChannel.class);
|
bootstrap.channel(NioSocketChannel.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
|
bootstrap.handler(getClientChannelInitializer());
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void initChannel(SocketChannel ch) throws Exception {
|
|
||||||
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
|
|
||||||
// using a dot as a prefix means this cannot come from any settings parsed
|
|
||||||
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client"));
|
|
||||||
}
|
|
||||||
|
|
||||||
});
|
|
||||||
|
|
||||||
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.millis()));
|
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.millis()));
|
||||||
bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings));
|
bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings));
|
||||||
|
@ -292,14 +283,7 @@ public class Netty4Transport extends TcpTransport<Channel> {
|
||||||
serverBootstrap.channel(NioServerSocketChannel.class);
|
serverBootstrap.channel(NioServerSocketChannel.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
|
serverBootstrap.childHandler(getServerChannelInitializer(name, settings));
|
||||||
@Override
|
|
||||||
protected void initChannel(SocketChannel ch) throws Exception {
|
|
||||||
ch.pipeline().addLast("open_channels", Netty4Transport.this.serverOpenChannels);
|
|
||||||
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
|
|
||||||
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings));
|
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings));
|
||||||
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(settings));
|
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(settings));
|
||||||
|
@ -326,6 +310,14 @@ public class Netty4Transport extends TcpTransport<Channel> {
|
||||||
serverBootstraps.put(name, serverBootstrap);
|
serverBootstraps.put(name, serverBootstrap);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected ChannelInitializer<SocketChannel> getServerChannelInitializer(String name, Settings settings) {
|
||||||
|
return new ServerChannelInitializer(name, settings);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ChannelInitializer<SocketChannel> getClientChannelInitializer() {
|
||||||
|
return new ClientChannelInitializer();
|
||||||
|
}
|
||||||
|
|
||||||
protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||||
final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class);
|
final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class);
|
||||||
final Throwable t = unwrapped != null ? unwrapped : cause;
|
final Throwable t = unwrapped != null ? unwrapped : cause;
|
||||||
|
@ -348,7 +340,9 @@ public class Netty4Transport extends TcpTransport<Channel> {
|
||||||
Channel[] channels = new Channel[1];
|
Channel[] channels = new Channel[1];
|
||||||
channels[0] = connect.channel();
|
channels[0] = connect.channel();
|
||||||
channels[0].closeFuture().addListener(new ChannelCloseListener(node));
|
channels[0].closeFuture().addListener(new ChannelCloseListener(node));
|
||||||
return new NodeChannels(channels, channels, channels, channels, channels);
|
NodeChannels nodeChannels = new NodeChannels(channels, channels, channels, channels, channels);
|
||||||
|
onAfterChannelsConnected(nodeChannels);
|
||||||
|
return nodeChannels;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected NodeChannels connectToChannels(DiscoveryNode node) {
|
protected NodeChannels connectToChannels(DiscoveryNode node) {
|
||||||
|
@ -409,6 +403,7 @@ public class Netty4Transport extends TcpTransport<Channel> {
|
||||||
}
|
}
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
onAfterChannelsConnected(nodeChannels);
|
||||||
success = true;
|
success = true;
|
||||||
} finally {
|
} finally {
|
||||||
if (success == false) {
|
if (success == false) {
|
||||||
|
@ -422,6 +417,9 @@ public class Netty4Transport extends TcpTransport<Channel> {
|
||||||
return nodeChannels;
|
return nodeChannels;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void onAfterChannelsConnected(NodeChannels nodeChannels) {
|
||||||
|
|
||||||
|
}
|
||||||
private class ChannelCloseListener implements ChannelFutureListener {
|
private class ChannelCloseListener implements ChannelFutureListener {
|
||||||
|
|
||||||
private final DiscoveryNode node;
|
private final DiscoveryNode node;
|
||||||
|
@ -503,4 +501,32 @@ public class Netty4Transport extends TcpTransport<Channel> {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void initChannel(SocketChannel ch) throws Exception {
|
||||||
|
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
|
||||||
|
// using a dot as a prefix means this cannot come from any settings parsed
|
||||||
|
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client"));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
protected class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
|
||||||
|
|
||||||
|
protected final String name;
|
||||||
|
protected final Settings settings;
|
||||||
|
|
||||||
|
protected ServerChannelInitializer(String name, Settings settings) {
|
||||||
|
this.name = name;
|
||||||
|
this.settings = settings;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void initChannel(SocketChannel ch) throws Exception {
|
||||||
|
ch.pipeline().addLast("open_channels", Netty4Transport.this.serverOpenChannels);
|
||||||
|
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
|
||||||
|
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue