From 4e04f95ab4d8305192c62bf4f2b06a32feaa212b Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 20 Nov 2017 14:53:08 -0700 Subject: [PATCH] Fix issue where pages aren't released (#27459) This is related to #27422. Right now when we send a write to the netty transport, we attach a listener to the future. When you submit a write on the netty event loop and the event loop is shutdown, the onFailure method is called. Unfortunately, netty then tries to notify the listener which cannot be done without dispatching to the event loop. In this case, the dispatch fails and netty logs and error and does not tell us. This commit checks that netty is still not shutdown after sending a message. If netty is shutdown, we complete the listener. --- .../transport/netty4/NettyTcpChannel.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java index fa9989f7270..3d71735a2a8 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java @@ -22,13 +22,17 @@ package org.elasticsearch.transport.netty4; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPromise; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.transport.TcpChannel; +import org.elasticsearch.transport.TransportException; import java.net.InetSocketAddress; +import java.nio.channels.ClosedSelectorException; import java.util.concurrent.CompletableFuture; public class NettyTcpChannel implements TcpChannel { @@ -80,8 +84,8 @@ public class NettyTcpChannel implements TcpChannel { @Override public void sendMessage(BytesReference reference, ActionListener listener) { - final ChannelFuture future = channel.writeAndFlush(Netty4Utils.toByteBuf(reference)); - future.addListener(f -> { + ChannelPromise writePromise = channel.newPromise(); + writePromise.addListener(f -> { if (f.isSuccess()) { listener.onResponse(null); } else { @@ -91,6 +95,11 @@ public class NettyTcpChannel implements TcpChannel { listener.onFailure((Exception) cause); } }); + channel.writeAndFlush(Netty4Utils.toByteBuf(reference), writePromise); + + if (channel.eventLoop().isShutdown()) { + listener.onFailure(new TransportException("Cannot send message, event loop is shutting down.")); + } } public Channel getLowLevelChannel() {