From af06231d4cb00d730b2689f628255634ef42fc83 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 26 Sep 2017 19:58:11 +0000 Subject: [PATCH] #26701 Close TcpTransport on RST in some Spots to Prevent Leaking TIME_WAIT Sockets (#26764) #26701 Added option to RST instead of FIN to TcpTransport#closeChannels --- .../org/elasticsearch/transport/TcpTransport.java | 13 +++++++------ .../elasticsearch/transport/TCPTransportTests.java | 2 +- .../transport/netty4/Netty4Transport.java | 7 ++++++- .../elasticsearch/transport/MockTcpTransport.java | 13 ++++++++++--- .../elasticsearch/transport/nio/NioTransport.java | 9 +++++++-- 5 files changed, 31 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 1af4f101e04..e9de9aaa3c8 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -442,7 +442,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i public void close() throws IOException { if (closed.compareAndSet(false, true)) { try { - closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()), false); + closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()), false, true); } finally { transportService.onConnectionClosed(this); } @@ -640,7 +640,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i protected final void closeChannelWhileHandlingExceptions(final Channel channel) { if (isOpen(channel)) { try { - closeChannels(Collections.singletonList(channel), false); + closeChannels(Collections.singletonList(channel), false, false); } catch (IOException e) { logger.warn("failed to close channel", e); } @@ -902,7 +902,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i // first stop to accept any incoming connections so nobody can connect to this transport for (Map.Entry> entry : serverChannels.entrySet()) { try { - closeChannels(entry.getValue(), true); + closeChannels(entry.getValue(), true, true); } catch (Exception e) { logger.debug( (Supplier) () -> new ParameterizedMessage( @@ -975,7 +975,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i @Override protected void innerInnerOnResponse(Channel channel) { try { - closeChannels(Collections.singletonList(channel), false); + closeChannels(Collections.singletonList(channel), false, false); } catch (IOException e1) { logger.debug("failed to close httpOnTransport channel", e1); } @@ -984,7 +984,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i @Override protected void innerOnFailure(Exception e) { try { - closeChannels(Collections.singletonList(channel), false); + closeChannels(Collections.singletonList(channel), false, false); } catch (IOException e1) { e.addSuppressed(e1); logger.debug("failed to close httpOnTransport channel", e1); @@ -1021,8 +1021,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i * * @param channels the channels to close * @param blocking whether the channels should be closed synchronously + * @param closingTransport whether we abort the connection on RST instead of FIN */ - protected abstract void closeChannels(List channels, boolean blocking) throws IOException; + protected abstract void closeChannels(List channels, boolean blocking, boolean closingTransport) throws IOException; /** * Sends message to channel. The listener's onResponse method will be called when the send is complete unless an exception diff --git a/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java b/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java index c386b2865af..55457cc8ae4 100644 --- a/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java @@ -191,7 +191,7 @@ public class TCPTransportTests extends ESTestCase { } @Override - protected void closeChannels(List channel, boolean blocking) throws IOException { + protected void closeChannels(List channel, boolean blocking, boolean closingTransport) throws IOException { } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index d92066e1fcc..a196976dc12 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -331,7 +331,12 @@ public class Netty4Transport extends TcpTransport { } @Override - protected void closeChannels(final List channels, boolean blocking) throws IOException { + protected void closeChannels(final List channels, boolean blocking, boolean closingTransport) throws IOException { + if (closingTransport) { + for (Channel channel : channels) { + channel.config().setOption(ChannelOption.SO_LINGER, 0); + } + } if (blocking) { Netty4Utils.closeChannels(channels); } else { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index bbfccb8229a..085469059bf 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -117,12 +117,12 @@ public class MockTcpTransport extends TcpTransport @Override protected MockChannel bind(final String name, InetSocketAddress address) throws IOException { MockServerSocket socket = new MockServerSocket(); - socket.bind(address); socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings)); ByteSizeValue tcpReceiveBufferSize = TCP_RECEIVE_BUFFER_SIZE.get(settings); if (tcpReceiveBufferSize.getBytes() > 0) { socket.setReceiveBufferSize(tcpReceiveBufferSize.bytesAsInt()); } + socket.bind(address); MockChannel serverMockChannel = new MockChannel(socket, name); CountDownLatch started = new CountDownLatch(1); executor.execute(new AbstractRunnable() { @@ -242,8 +242,15 @@ public class MockTcpTransport extends TcpTransport } @Override - protected void closeChannels(List channel, boolean blocking) throws IOException { - IOUtils.close(channel); + protected void closeChannels(List channels, boolean blocking, boolean closingTransport) throws IOException { + if (closingTransport) { + for (MockChannel channel : channels) { + if (channel.activeChannel != null) { + channel.activeChannel.setSoLinger(true, 0); + } + } + } + IOUtils.close(channels); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 5a9dba29f5e..b22feb56976 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport.nio; +import java.net.StandardSocketOptions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -28,7 +29,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.CircuitBreakerService; @@ -99,7 +99,12 @@ public class NioTransport extends TcpTransport { } @Override - protected void closeChannels(List channels, boolean blocking) throws IOException { + protected void closeChannels(List channels, boolean blocking, boolean closingTransport) throws IOException { + if (closingTransport) { + for (NioChannel channel : channels) { + channel.getRawChannel().setOption(StandardSocketOptions.SO_LINGER, 0); + } + } ArrayList futures = new ArrayList<>(channels.size()); for (final NioChannel channel : channels) { if (channel != null && channel.isOpen()) {