#26701 Close TcpTransport on RST in some Spots to Prevent Leaking TIME_WAIT Sockets (#26764)

#26701 Added option to RST instead of FIN to TcpTransport#closeChannels
This commit is contained in:
Armin Braun 2017-09-26 19:58:11 +00:00 committed by GitHub
parent 6952f7b560
commit af06231d4c
5 changed files with 31 additions and 13 deletions

View File

@ -442,7 +442,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
public void close() throws IOException { public void close() throws IOException {
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
try { try {
closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()), false); closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()), false, true);
} finally { } finally {
transportService.onConnectionClosed(this); transportService.onConnectionClosed(this);
} }
@ -640,7 +640,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
protected final void closeChannelWhileHandlingExceptions(final Channel channel) { protected final void closeChannelWhileHandlingExceptions(final Channel channel) {
if (isOpen(channel)) { if (isOpen(channel)) {
try { try {
closeChannels(Collections.singletonList(channel), false); closeChannels(Collections.singletonList(channel), false, false);
} catch (IOException e) { } catch (IOException e) {
logger.warn("failed to close channel", e); logger.warn("failed to close channel", e);
} }
@ -902,7 +902,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
// first stop to accept any incoming connections so nobody can connect to this transport // first stop to accept any incoming connections so nobody can connect to this transport
for (Map.Entry<String, List<Channel>> entry : serverChannels.entrySet()) { for (Map.Entry<String, List<Channel>> entry : serverChannels.entrySet()) {
try { try {
closeChannels(entry.getValue(), true); closeChannels(entry.getValue(), true, true);
} catch (Exception e) { } catch (Exception e) {
logger.debug( logger.debug(
(Supplier<?>) () -> new ParameterizedMessage( (Supplier<?>) () -> new ParameterizedMessage(
@ -975,7 +975,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
@Override @Override
protected void innerInnerOnResponse(Channel channel) { protected void innerInnerOnResponse(Channel channel) {
try { try {
closeChannels(Collections.singletonList(channel), false); closeChannels(Collections.singletonList(channel), false, false);
} catch (IOException e1) { } catch (IOException e1) {
logger.debug("failed to close httpOnTransport channel", e1); logger.debug("failed to close httpOnTransport channel", e1);
} }
@ -984,7 +984,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
@Override @Override
protected void innerOnFailure(Exception e) { protected void innerOnFailure(Exception e) {
try { try {
closeChannels(Collections.singletonList(channel), false); closeChannels(Collections.singletonList(channel), false, false);
} catch (IOException e1) { } catch (IOException e1) {
e.addSuppressed(e1); e.addSuppressed(e1);
logger.debug("failed to close httpOnTransport channel", e1); logger.debug("failed to close httpOnTransport channel", e1);
@ -1021,8 +1021,9 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
* *
* @param channels the channels to close * @param channels the channels to close
* @param blocking whether the channels should be closed synchronously * @param blocking whether the channels should be closed synchronously
* @param closingTransport whether we abort the connection on RST instead of FIN
*/ */
protected abstract void closeChannels(List<Channel> channels, boolean blocking) throws IOException; protected abstract void closeChannels(List<Channel> channels, boolean blocking, boolean closingTransport) throws IOException;
/** /**
* Sends message to channel. The listener's onResponse method will be called when the send is complete unless an exception * Sends message to channel. The listener's onResponse method will be called when the send is complete unless an exception

View File

@ -191,7 +191,7 @@ public class TCPTransportTests extends ESTestCase {
} }
@Override @Override
protected void closeChannels(List channel, boolean blocking) throws IOException { protected void closeChannels(List channel, boolean blocking, boolean closingTransport) throws IOException {
} }

View File

@ -331,7 +331,12 @@ public class Netty4Transport extends TcpTransport<Channel> {
} }
@Override @Override
protected void closeChannels(final List<Channel> channels, boolean blocking) throws IOException { protected void closeChannels(final List<Channel> channels, boolean blocking, boolean closingTransport) throws IOException {
if (closingTransport) {
for (Channel channel : channels) {
channel.config().setOption(ChannelOption.SO_LINGER, 0);
}
}
if (blocking) { if (blocking) {
Netty4Utils.closeChannels(channels); Netty4Utils.closeChannels(channels);
} else { } else {

View File

@ -117,12 +117,12 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
@Override @Override
protected MockChannel bind(final String name, InetSocketAddress address) throws IOException { protected MockChannel bind(final String name, InetSocketAddress address) throws IOException {
MockServerSocket socket = new MockServerSocket(); MockServerSocket socket = new MockServerSocket();
socket.bind(address);
socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings)); socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings));
ByteSizeValue tcpReceiveBufferSize = TCP_RECEIVE_BUFFER_SIZE.get(settings); ByteSizeValue tcpReceiveBufferSize = TCP_RECEIVE_BUFFER_SIZE.get(settings);
if (tcpReceiveBufferSize.getBytes() > 0) { if (tcpReceiveBufferSize.getBytes() > 0) {
socket.setReceiveBufferSize(tcpReceiveBufferSize.bytesAsInt()); socket.setReceiveBufferSize(tcpReceiveBufferSize.bytesAsInt());
} }
socket.bind(address);
MockChannel serverMockChannel = new MockChannel(socket, name); MockChannel serverMockChannel = new MockChannel(socket, name);
CountDownLatch started = new CountDownLatch(1); CountDownLatch started = new CountDownLatch(1);
executor.execute(new AbstractRunnable() { executor.execute(new AbstractRunnable() {
@ -242,8 +242,15 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
} }
@Override @Override
protected void closeChannels(List<MockChannel> channel, boolean blocking) throws IOException { protected void closeChannels(List<MockChannel> channels, boolean blocking, boolean closingTransport) throws IOException {
IOUtils.close(channel); if (closingTransport) {
for (MockChannel channel : channels) {
if (channel.activeChannel != null) {
channel.activeChannel.setSoLinger(true, 0);
}
}
}
IOUtils.close(channels);
} }
@Override @Override

View File

@ -19,6 +19,7 @@
package org.elasticsearch.transport.nio; package org.elasticsearch.transport.nio;
import java.net.StandardSocketOptions;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
@ -28,7 +29,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.CircuitBreakerService;
@ -99,7 +99,12 @@ public class NioTransport extends TcpTransport<NioChannel> {
} }
@Override @Override
protected void closeChannels(List<NioChannel> channels, boolean blocking) throws IOException { protected void closeChannels(List<NioChannel> channels, boolean blocking, boolean closingTransport) throws IOException {
if (closingTransport) {
for (NioChannel channel : channels) {
channel.getRawChannel().setOption(StandardSocketOptions.SO_LINGER, 0);
}
}
ArrayList<CloseFuture> futures = new ArrayList<>(channels.size()); ArrayList<CloseFuture> futures = new ArrayList<>(channels.size());
for (final NioChannel channel : channels) { for (final NioChannel channel : channels) {
if (channel != null && channel.isOpen()) { if (channel != null && channel.isOpen()) {