From b22bbf94da3f51910e265f76e29f885831faa8a4 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 10 Jul 2017 10:50:51 -0500 Subject: [PATCH] Avoid blocking on channel close on network thread (#25521) Currently when we close a channel in Netty4Utils.closeChannels we block until the closing is complete. This introduces the possibility that a network selector thread will block while waiting until a separate network selector thread closes a channel. For instance: T1 closes channel 1 (which is assigned to a T1 selector). Channel 1's close listener executes the closing of the node. That means that T1 now tries to close channel 2. However, channel 2 is assigned to a selector that is running on T2. T1 now must wait until T2 closes that channel at some point in the future. This commit addresses this by adding a boolean to closeChannels indicating if we should block on close. We only set this boolean to true if we are closing down the server channels at shutdown. This call is never made from a network thread. When we call the closeChannels method with that boolean set to false, we do not block on close. --- .../elasticsearch/transport/TcpTransport.java | 19 +++-- .../transport/TCPTransportTests.java | 2 +- .../transport/netty4/Netty4Transport.java | 18 ++++- .../transport/MockTcpTransport.java | 3 +- .../transport/nio/NioTransport.java | 80 +++++++------------ 5 files changed, 56 insertions(+), 66 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 7e1d3422de9..689a54fc8da 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -442,7 +442,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i public void close() throws IOException { if (closed.compareAndSet(false, true)) { try { - closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList())); + closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()), false); } finally { transportServiceAdapter.onConnectionClosed(this); } @@ -640,7 +640,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i protected final void closeChannelWhileHandlingExceptions(final Channel channel) { if (isOpen(channel)) { try { - closeChannels(Collections.singletonList(channel)); + closeChannels(Collections.singletonList(channel), false); } catch (IOException e) { logger.warn("failed to close channel", e); } @@ -902,7 +902,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i // first stop to accept any incoming connections so nobody can connect to this transport for (Map.Entry> entry : serverChannels.entrySet()) { try { - closeChannels(entry.getValue()); + closeChannels(entry.getValue(), true); } catch (Exception e) { logger.debug( (Supplier) () -> new ParameterizedMessage( @@ -975,7 +975,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i @Override protected void innerInnerOnResponse(Channel channel) { try { - closeChannels(Collections.singletonList(channel)); + closeChannels(Collections.singletonList(channel), false); } catch (IOException e1) { logger.debug("failed to close httpOnTransport channel", e1); } @@ -984,7 +984,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i @Override protected void innerOnFailure(Exception e) { try { - closeChannels(Collections.singletonList(channel)); + closeChannels(Collections.singletonList(channel), false); } catch (IOException e1) { e.addSuppressed(e1); logger.debug("failed to close httpOnTransport channel", e1); @@ -1015,9 +1015,14 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i protected abstract Channel bind(String name, InetSocketAddress address) throws IOException; /** - * Closes all channels in this list + * Closes all channels in this list. If the blocking boolean is set to true, the channels must be + * closed before the method returns. This should never be called with blocking set to true from a + * network thread. + * + * @param channels the channels to close + * @param blocking whether the channels should be closed synchronously */ - protected abstract void closeChannels(List channel) throws IOException; + protected abstract void closeChannels(List channels, boolean blocking) throws IOException; /** * Sends message to channel. The listener's onResponse method will be called when the send is complete unless an exception diff --git a/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java b/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java index 84bc377baca..32405485676 100644 --- a/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java @@ -191,7 +191,7 @@ public class TCPTransportTests extends ESTestCase { } @Override - protected void closeChannels(List channel) throws IOException { + protected void closeChannels(List channel, boolean blocking) throws IOException { } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 5623005f7d9..58ce6ed5976 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -70,7 +70,6 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -341,8 +340,21 @@ public class Netty4Transport extends TcpTransport { } @Override - protected void closeChannels(final List channels) throws IOException { - Netty4Utils.closeChannels(channels); + protected void closeChannels(final List channels, boolean blocking) throws IOException { + if (blocking) { + Netty4Utils.closeChannels(channels); + } else { + for (Channel channel : channels) { + if (channel != null && channel.isOpen()) { + ChannelFuture closeFuture = channel.close(); + closeFuture.addListener((f) -> { + if (f.isSuccess() == false) { + logger.warn("failed to close channel", f.cause()); + } + }); + } + } + } } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index f2849705e05..bbfccb8229a 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -53,7 +53,6 @@ import java.net.SocketTimeoutException; import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -243,7 +242,7 @@ public class MockTcpTransport extends TcpTransport } @Override - protected void closeChannels(List channel) throws IOException { + protected void closeChannels(List channel, boolean blocking) throws IOException { IOUtils.close(channel); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 335d438f577..90ebc858bbf 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -37,6 +37,7 @@ import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.Transports; import org.elasticsearch.transport.nio.channel.ChannelFactory; +import org.elasticsearch.transport.nio.channel.CloseFuture; import org.elasticsearch.transport.nio.channel.NioChannel; import org.elasticsearch.transport.nio.channel.NioServerSocketChannel; import org.elasticsearch.transport.nio.channel.NioSocketChannel; @@ -45,7 +46,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; -import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadFactory; import java.util.function.Consumer; @@ -100,28 +100,29 @@ public class NioTransport extends TcpTransport { } @Override - protected void closeChannels(List channels) throws IOException { - IOException closingExceptions = null; + protected void closeChannels(List channels, boolean blocking) throws IOException { + ArrayList futures = new ArrayList<>(channels.size()); for (final NioChannel channel : channels) { if (channel != null && channel.isOpen()) { - try { - // If we are currently on the selector thread that handles this channel, we should prefer - // the closeFromSelector method. This method always closes the channel immediately. - ESSelector selector = channel.getSelector(); - if (selector != null && selector.isOnCurrentThread()) { - channel.closeFromSelector(); - } else { - channel.closeAsync().awaitClose(); - } - } catch (Exception e) { - if (closingExceptions == null) { - closingExceptions = new IOException("failed to close channels"); - } - closingExceptions.addSuppressed(e.getCause()); - } + futures.add(channel.closeAsync()); } } + if (blocking == false) { + return; + } + IOException closingExceptions = null; + for (CloseFuture future : futures) { + try { + future.awaitClose(); + IOException closeException = future.getCloseException(); + if (closeException != null) { + closingExceptions = addClosingException(closingExceptions, closeException); + } + } catch (InterruptedException e) { + closingExceptions = addClosingException(closingExceptions, e); + } + } if (closingExceptions != null) { throw closingExceptions; } @@ -226,47 +227,20 @@ public class NioTransport extends TcpTransport { onException(channel, t instanceof Exception ? (Exception) t : new ElasticsearchException(t)); } - private Settings createFallbackSettings() { - Settings.Builder fallbackSettingsBuilder = Settings.builder(); - - List fallbackBindHost = TcpTransport.BIND_HOST.get(settings); - if (fallbackBindHost.isEmpty() == false) { - fallbackSettingsBuilder.putArray("bind_host", fallbackBindHost); - } - - List fallbackPublishHost = TcpTransport.PUBLISH_HOST.get(settings); - if (fallbackPublishHost.isEmpty() == false) { - fallbackSettingsBuilder.putArray("publish_host", fallbackPublishHost); - } - - boolean fallbackTcpNoDelay = TcpTransport.TCP_NO_DELAY.get(settings); - fallbackSettingsBuilder.put("tcp_no_delay", fallbackTcpNoDelay); - - boolean fallbackTcpKeepAlive = TcpTransport.TCP_KEEP_ALIVE.get(settings); - fallbackSettingsBuilder.put("tcp_keep_alive", fallbackTcpKeepAlive); - - boolean fallbackReuseAddress = TcpTransport.TCP_REUSE_ADDRESS.get(settings);; - fallbackSettingsBuilder.put("reuse_address", fallbackReuseAddress); - - ByteSizeValue fallbackTcpSendBufferSize = TcpTransport.TCP_SEND_BUFFER_SIZE.get(settings); - if (fallbackTcpSendBufferSize.getBytes() >= 0) { - fallbackSettingsBuilder.put("tcp_send_buffer_size", fallbackTcpSendBufferSize); - } - - ByteSizeValue fallbackTcpBufferSize = TcpTransport.TCP_RECEIVE_BUFFER_SIZE.get(settings);; - if (fallbackTcpBufferSize.getBytes() >= 0) { - fallbackSettingsBuilder.put("tcp_receive_buffer_size", fallbackTcpBufferSize); - } - - return fallbackSettingsBuilder.build(); - } - private NioClient createClient() { Supplier selectorSupplier = new RoundRobinSelectorSupplier(socketSelectors); ChannelFactory channelFactory = new ChannelFactory(new ProfileSettings(settings, "default"), tcpReadHandler); return new NioClient(logger, openChannels, selectorSupplier, defaultConnectionProfile.getConnectTimeout(), channelFactory); } + private IOException addClosingException(IOException closingExceptions, Exception e) { + if (closingExceptions == null) { + closingExceptions = new IOException("failed to close channels"); + } + closingExceptions.addSuppressed(e); + return closingExceptions; + } + class ClientChannelCloseListener implements Consumer { private final Consumer consumer;