Avoid blocking on channel close on network thread (#25521)
Currently when we close a channel in Netty4Utils.closeChannels we block until the closing is complete. This introduces the possibility that a network selector thread will block while waiting until a separate network selector thread closes a channel. For instance: T1 closes channel 1 (which is assigned to a T1 selector). Channel 1's close listener executes the closing of the node. That means that T1 now tries to close channel 2. However, channel 2 is assigned to a selector that is running on T2. T1 now must wait until T2 closes that channel at some point in the future. This commit addresses this by adding a boolean to closeChannels indicating if we should block on close. We only set this boolean to true if we are closing down the server channels at shutdown. This call is never made from a network thread. When we call the closeChannels method with that boolean set to false, we do not block on close.
This commit is contained in:
parent
8cf0528001
commit
b22bbf94da
|
@ -442,7 +442,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
if (closed.compareAndSet(false, true)) {
|
if (closed.compareAndSet(false, true)) {
|
||||||
try {
|
try {
|
||||||
closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()));
|
closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()), false);
|
||||||
} finally {
|
} finally {
|
||||||
transportServiceAdapter.onConnectionClosed(this);
|
transportServiceAdapter.onConnectionClosed(this);
|
||||||
}
|
}
|
||||||
|
@ -640,7 +640,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
||||||
protected final void closeChannelWhileHandlingExceptions(final Channel channel) {
|
protected final void closeChannelWhileHandlingExceptions(final Channel channel) {
|
||||||
if (isOpen(channel)) {
|
if (isOpen(channel)) {
|
||||||
try {
|
try {
|
||||||
closeChannels(Collections.singletonList(channel));
|
closeChannels(Collections.singletonList(channel), false);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.warn("failed to close channel", e);
|
logger.warn("failed to close channel", e);
|
||||||
}
|
}
|
||||||
|
@ -902,7 +902,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
||||||
// first stop to accept any incoming connections so nobody can connect to this transport
|
// first stop to accept any incoming connections so nobody can connect to this transport
|
||||||
for (Map.Entry<String, List<Channel>> entry : serverChannels.entrySet()) {
|
for (Map.Entry<String, List<Channel>> entry : serverChannels.entrySet()) {
|
||||||
try {
|
try {
|
||||||
closeChannels(entry.getValue());
|
closeChannels(entry.getValue(), true);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.debug(
|
logger.debug(
|
||||||
(Supplier<?>) () -> new ParameterizedMessage(
|
(Supplier<?>) () -> new ParameterizedMessage(
|
||||||
|
@ -975,7 +975,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
||||||
@Override
|
@Override
|
||||||
protected void innerInnerOnResponse(Channel channel) {
|
protected void innerInnerOnResponse(Channel channel) {
|
||||||
try {
|
try {
|
||||||
closeChannels(Collections.singletonList(channel));
|
closeChannels(Collections.singletonList(channel), false);
|
||||||
} catch (IOException e1) {
|
} catch (IOException e1) {
|
||||||
logger.debug("failed to close httpOnTransport channel", e1);
|
logger.debug("failed to close httpOnTransport channel", e1);
|
||||||
}
|
}
|
||||||
|
@ -984,7 +984,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
||||||
@Override
|
@Override
|
||||||
protected void innerOnFailure(Exception e) {
|
protected void innerOnFailure(Exception e) {
|
||||||
try {
|
try {
|
||||||
closeChannels(Collections.singletonList(channel));
|
closeChannels(Collections.singletonList(channel), false);
|
||||||
} catch (IOException e1) {
|
} catch (IOException e1) {
|
||||||
e.addSuppressed(e1);
|
e.addSuppressed(e1);
|
||||||
logger.debug("failed to close httpOnTransport channel", e1);
|
logger.debug("failed to close httpOnTransport channel", e1);
|
||||||
|
@ -1015,9 +1015,14 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
||||||
protected abstract Channel bind(String name, InetSocketAddress address) throws IOException;
|
protected abstract Channel bind(String name, InetSocketAddress address) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Closes all channels in this list
|
* Closes all channels in this list. If the blocking boolean is set to true, the channels must be
|
||||||
|
* closed before the method returns. This should never be called with blocking set to true from a
|
||||||
|
* network thread.
|
||||||
|
*
|
||||||
|
* @param channels the channels to close
|
||||||
|
* @param blocking whether the channels should be closed synchronously
|
||||||
*/
|
*/
|
||||||
protected abstract void closeChannels(List<Channel> channel) throws IOException;
|
protected abstract void closeChannels(List<Channel> channels, boolean blocking) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends message to channel. The listener's onResponse method will be called when the send is complete unless an exception
|
* Sends message to channel. The listener's onResponse method will be called when the send is complete unless an exception
|
||||||
|
|
|
@ -191,7 +191,7 @@ public class TCPTransportTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void closeChannels(List channel) throws IOException {
|
protected void closeChannels(List channel, boolean blocking) throws IOException {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -70,7 +70,6 @@ import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
@ -341,8 +340,21 @@ public class Netty4Transport extends TcpTransport<Channel> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void closeChannels(final List<Channel> channels) throws IOException {
|
protected void closeChannels(final List<Channel> channels, boolean blocking) throws IOException {
|
||||||
|
if (blocking) {
|
||||||
Netty4Utils.closeChannels(channels);
|
Netty4Utils.closeChannels(channels);
|
||||||
|
} else {
|
||||||
|
for (Channel channel : channels) {
|
||||||
|
if (channel != null && channel.isOpen()) {
|
||||||
|
ChannelFuture closeFuture = channel.close();
|
||||||
|
closeFuture.addListener((f) -> {
|
||||||
|
if (f.isSuccess() == false) {
|
||||||
|
logger.warn("failed to close channel", f.cause());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -53,7 +53,6 @@ import java.net.SocketTimeoutException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
@ -243,7 +242,7 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void closeChannels(List<MockChannel> channel) throws IOException {
|
protected void closeChannels(List<MockChannel> channel, boolean blocking) throws IOException {
|
||||||
IOUtils.close(channel);
|
IOUtils.close(channel);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.transport.ConnectionProfile;
|
||||||
import org.elasticsearch.transport.TcpTransport;
|
import org.elasticsearch.transport.TcpTransport;
|
||||||
import org.elasticsearch.transport.Transports;
|
import org.elasticsearch.transport.Transports;
|
||||||
import org.elasticsearch.transport.nio.channel.ChannelFactory;
|
import org.elasticsearch.transport.nio.channel.ChannelFactory;
|
||||||
|
import org.elasticsearch.transport.nio.channel.CloseFuture;
|
||||||
import org.elasticsearch.transport.nio.channel.NioChannel;
|
import org.elasticsearch.transport.nio.channel.NioChannel;
|
||||||
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
|
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
|
||||||
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
|
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
|
||||||
|
@ -45,7 +46,6 @@ import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
@ -100,28 +100,29 @@ public class NioTransport extends TcpTransport<NioChannel> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void closeChannels(List<NioChannel> channels) throws IOException {
|
protected void closeChannels(List<NioChannel> channels, boolean blocking) throws IOException {
|
||||||
IOException closingExceptions = null;
|
ArrayList<CloseFuture> futures = new ArrayList<>(channels.size());
|
||||||
for (final NioChannel channel : channels) {
|
for (final NioChannel channel : channels) {
|
||||||
if (channel != null && channel.isOpen()) {
|
if (channel != null && channel.isOpen()) {
|
||||||
try {
|
futures.add(channel.closeAsync());
|
||||||
// If we are currently on the selector thread that handles this channel, we should prefer
|
|
||||||
// the closeFromSelector method. This method always closes the channel immediately.
|
|
||||||
ESSelector selector = channel.getSelector();
|
|
||||||
if (selector != null && selector.isOnCurrentThread()) {
|
|
||||||
channel.closeFromSelector();
|
|
||||||
} else {
|
|
||||||
channel.closeAsync().awaitClose();
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
if (closingExceptions == null) {
|
|
||||||
closingExceptions = new IOException("failed to close channels");
|
|
||||||
}
|
|
||||||
closingExceptions.addSuppressed(e.getCause());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (blocking == false) {
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
IOException closingExceptions = null;
|
||||||
|
for (CloseFuture future : futures) {
|
||||||
|
try {
|
||||||
|
future.awaitClose();
|
||||||
|
IOException closeException = future.getCloseException();
|
||||||
|
if (closeException != null) {
|
||||||
|
closingExceptions = addClosingException(closingExceptions, closeException);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
closingExceptions = addClosingException(closingExceptions, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
if (closingExceptions != null) {
|
if (closingExceptions != null) {
|
||||||
throw closingExceptions;
|
throw closingExceptions;
|
||||||
}
|
}
|
||||||
|
@ -226,47 +227,20 @@ public class NioTransport extends TcpTransport<NioChannel> {
|
||||||
onException(channel, t instanceof Exception ? (Exception) t : new ElasticsearchException(t));
|
onException(channel, t instanceof Exception ? (Exception) t : new ElasticsearchException(t));
|
||||||
}
|
}
|
||||||
|
|
||||||
private Settings createFallbackSettings() {
|
|
||||||
Settings.Builder fallbackSettingsBuilder = Settings.builder();
|
|
||||||
|
|
||||||
List<String> fallbackBindHost = TcpTransport.BIND_HOST.get(settings);
|
|
||||||
if (fallbackBindHost.isEmpty() == false) {
|
|
||||||
fallbackSettingsBuilder.putArray("bind_host", fallbackBindHost);
|
|
||||||
}
|
|
||||||
|
|
||||||
List<String> fallbackPublishHost = TcpTransport.PUBLISH_HOST.get(settings);
|
|
||||||
if (fallbackPublishHost.isEmpty() == false) {
|
|
||||||
fallbackSettingsBuilder.putArray("publish_host", fallbackPublishHost);
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean fallbackTcpNoDelay = TcpTransport.TCP_NO_DELAY.get(settings);
|
|
||||||
fallbackSettingsBuilder.put("tcp_no_delay", fallbackTcpNoDelay);
|
|
||||||
|
|
||||||
boolean fallbackTcpKeepAlive = TcpTransport.TCP_KEEP_ALIVE.get(settings);
|
|
||||||
fallbackSettingsBuilder.put("tcp_keep_alive", fallbackTcpKeepAlive);
|
|
||||||
|
|
||||||
boolean fallbackReuseAddress = TcpTransport.TCP_REUSE_ADDRESS.get(settings);;
|
|
||||||
fallbackSettingsBuilder.put("reuse_address", fallbackReuseAddress);
|
|
||||||
|
|
||||||
ByteSizeValue fallbackTcpSendBufferSize = TcpTransport.TCP_SEND_BUFFER_SIZE.get(settings);
|
|
||||||
if (fallbackTcpSendBufferSize.getBytes() >= 0) {
|
|
||||||
fallbackSettingsBuilder.put("tcp_send_buffer_size", fallbackTcpSendBufferSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
ByteSizeValue fallbackTcpBufferSize = TcpTransport.TCP_RECEIVE_BUFFER_SIZE.get(settings);;
|
|
||||||
if (fallbackTcpBufferSize.getBytes() >= 0) {
|
|
||||||
fallbackSettingsBuilder.put("tcp_receive_buffer_size", fallbackTcpBufferSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
return fallbackSettingsBuilder.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
private NioClient createClient() {
|
private NioClient createClient() {
|
||||||
Supplier<SocketSelector> selectorSupplier = new RoundRobinSelectorSupplier(socketSelectors);
|
Supplier<SocketSelector> selectorSupplier = new RoundRobinSelectorSupplier(socketSelectors);
|
||||||
ChannelFactory channelFactory = new ChannelFactory(new ProfileSettings(settings, "default"), tcpReadHandler);
|
ChannelFactory channelFactory = new ChannelFactory(new ProfileSettings(settings, "default"), tcpReadHandler);
|
||||||
return new NioClient(logger, openChannels, selectorSupplier, defaultConnectionProfile.getConnectTimeout(), channelFactory);
|
return new NioClient(logger, openChannels, selectorSupplier, defaultConnectionProfile.getConnectTimeout(), channelFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private IOException addClosingException(IOException closingExceptions, Exception e) {
|
||||||
|
if (closingExceptions == null) {
|
||||||
|
closingExceptions = new IOException("failed to close channels");
|
||||||
|
}
|
||||||
|
closingExceptions.addSuppressed(e);
|
||||||
|
return closingExceptions;
|
||||||
|
}
|
||||||
|
|
||||||
class ClientChannelCloseListener implements Consumer<NioChannel> {
|
class ClientChannelCloseListener implements Consumer<NioChannel> {
|
||||||
|
|
||||||
private final Consumer<NioChannel> consumer;
|
private final Consumer<NioChannel> consumer;
|
||||||
|
|
Loading…
Reference in New Issue