Add NioTransport threads to thread name checks (#25477)

We have various assertions that check we never block on transport
threads. This commit adds the thread names for the NioTransport to
these assertions.

With this change I had to fix two places where we were calling blocking
methods from the transport threads.
This commit is contained in:
Tim Brooks 2017-06-29 15:16:07 -05:00 committed by GitHub
parent c32c21e875
commit cac2eec7d2
4 changed files with 20 additions and 11 deletions

View File

@ -29,6 +29,9 @@ public enum Transports {
/** threads whose name is prefixed by this string will be considered network threads, even though they aren't */
public static final String TEST_MOCK_TRANSPORT_THREAD_PREFIX = "__mock_network_thread";
public static final String NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX = "es_nio_transport_worker";
public static final String NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = "es_nio_transport_acceptor";
/**
* Utility method to detect whether a thread is a network thread. Typically
* used in assertions to make sure that we do not call blocking code from
@ -40,7 +43,9 @@ public enum Transports {
HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX,
TcpTransport.TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX,
TcpTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX,
TEST_MOCK_TRANSPORT_THREAD_PREFIX)) {
TEST_MOCK_TRANSPORT_THREAD_PREFIX,
NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX,
NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX)) {
if (threadName.contains(s)) {
return true;
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.nio.channel.ChannelFactory;
import org.elasticsearch.transport.nio.channel.NioChannel;
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
@ -57,9 +58,8 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF
public class NioTransport extends TcpTransport<NioChannel> {
// TODO: Need to add to places where we check if transport thread
public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = "transport_worker";
public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = "transport_acceptor";
public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX;
public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX;
public static final Setting<Integer> NIO_WORKER_COUNT =
new Setting<>("transport.nio.worker_count",
@ -108,7 +108,14 @@ public class NioTransport extends TcpTransport<NioChannel> {
for (final NioChannel channel : channels) {
if (channel != null && channel.isOpen()) {
try {
channel.closeAsync().awaitClose();
// If we are currently on the selector thread that handles this channel, we should prefer
// the closeFromSelector method. This method always closes the channel immediately.
ESSelector selector = channel.getSelector();
if (selector != null && selector.isOnCurrentThread()) {
channel.closeFromSelector();
} else {
channel.closeAsync().awaitClose();
}
} catch (Exception e) {
if (closingExceptions == null) {
closingExceptions = new IOException("failed to close channels");

View File

@ -102,11 +102,6 @@ public abstract class AbstractNioChannel<S extends SelectableChannel & NetworkCh
*/
@Override
public CloseFuture closeAsync() {
if (selector != null && selector.isOnCurrentThread()) {
closeFromSelector();
return closeFuture;
}
for (; ; ) {
int state = this.state.get();
if (state == UNREGISTERED && this.state.compareAndSet(UNREGISTERED, CLOSING)) {

View File

@ -80,12 +80,14 @@ public class ConnectFuture extends BaseFuture<NioSocketChannel> {
if (isDone()) {
try {
// Get should always return without blocking as we already checked 'isDone'
return super.get();
return super.get(0, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException e) {
return null;
} catch (TimeoutException e) {
throw new AssertionError("This should never happen as we only call get() after isDone() is true.");
}
} else {
return null;