Fix NPE when initializing an accepted socket in NettyTransport.

NettyTransport's ChannelPipelineFactory uses the instance variable
serverOpenChannels in order to create sockets. However, this instance variable
is set to null when stoping the netty transport, so if the transport tries to
stop and to initialize a socket at the same time you might hit the following
NullPointerException:

[2014-05-13 07:33:47,616][WARN ][netty.channel.socket.nio.AbstractNioSelector] Failed to initialize an accepted socket.
java.lang.NullPointerException: handler
	at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.<init>(DefaultChannelPipeline.java:725)
	at org.jboss.netty.channel.DefaultChannelPipeline.init(DefaultChannelPipeline.java:667)
	at org.jboss.netty.channel.DefaultChannelPipeline.addLast(DefaultChannelPipeline.java:96)
	at org.elasticsearch.transport.netty.NettyTransport$2.getPipeline(NettyTransport.java:327)
	at org.jboss.netty.channel.socket.nio.NioServerBoss.registerAcceptedChannel(NioServerBoss.java:134)
	at org.jboss.netty.channel.socket.nio.NioServerBoss.process(NioServerBoss.java:104)
	at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
	at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
	at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
	at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

This fix ensures that the ChannelPipelineFactory always uses the channels that
have been used upon start, even if a stop request is issued concurrently.

Close #6144
This commit is contained in:
Adrien Grand 2014-05-13 11:04:23 +02:00
parent d8c02c2599
commit 3ad321fcb2
1 changed files with 3 additions and 2 deletions

View File

@ -308,7 +308,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
return;
}
serverOpenChannels = new OpenChannelsHandler(logger);
final OpenChannelsHandler openChannels = new OpenChannelsHandler(logger);
this.serverOpenChannels = openChannels;
if (blockingServer) {
serverBootstrap = new ServerBootstrap(new OioServerSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_boss")),
@ -324,7 +325,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("openChannels", serverOpenChannels);
pipeline.addLast("openChannels", openChannels);
SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder();
if (maxCumulationBufferCapacity != null) {
if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) {