From b68358945f2d3818bf9d7fae80923520b857cde2 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 27 May 2019 11:44:36 +0200 Subject: [PATCH] Dump Stacktrace on Slow IO-Thread Operations (#42000) (#42572) * Dump Stacktrace on Slow IO-Thread Operations * Follow up to #39729 extending the functionality to actually dump the stack when the thread is blocked not afterwards * Logging the stacktrace after the thread became unblocked is only of limited use because we don't know what happened in the slow callback from that (only whether we were blocked on a read,write,connect etc.) * Relates #41745 --- .../transport/nio/MockNioTransport.java | 71 ++++++++++- .../transport/nio/TestEventHandler.java | 114 ++++++++++-------- .../transport/nio/TestEventHandlerTests.java | 13 +- 3 files changed, 143 insertions(+), 55 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index 39316ca9192..a261d68cbb3 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.nio.BytesChannelContext; @@ -57,11 +58,16 @@ import java.net.StandardSocketOptions; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.util.Arrays; import java.util.HashSet; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.IntFunction; +import java.util.stream.Collectors; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; @@ -70,6 +76,7 @@ public class MockNioTransport extends TcpTransport { private static final Logger logger = LogManager.getLogger(MockNioTransport.class); private final ConcurrentMap profileToChannelFactory = newConcurrentMap(); + private final TransportThreadWatchdog transportThreadWatchdog; private volatile NioSelectorGroup nioGroup; private volatile MockTcpChannelFactory clientChannelFactory; @@ -77,6 +84,7 @@ public class MockNioTransport extends TcpTransport { PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); + this.transportThreadWatchdog = new TransportThreadWatchdog(threadPool); } @Override @@ -96,7 +104,7 @@ public class MockNioTransport extends TcpTransport { boolean success = false; try { nioGroup = new NioSelectorGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2, - (s) -> new TestEventHandler(this::onNonChannelException, s, System::nanoTime)); + (s) -> new TestEventHandler(this::onNonChannelException, s, transportThreadWatchdog)); ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default"); clientChannelFactory = new MockTcpChannelFactory(true, clientProfileSettings, "client"); @@ -125,6 +133,7 @@ public class MockNioTransport extends TcpTransport { @Override protected void stopInternal() { try { + transportThreadWatchdog.stop(); nioGroup.close(); } catch (Exception e) { logger.warn("unexpected exception while stopping nio group", e); @@ -319,4 +328,64 @@ public class MockNioTransport extends TcpTransport { getContext().sendMessage(BytesReference.toByteBuffers(reference), ActionListener.toBiConsumer(listener)); } } + + static final class TransportThreadWatchdog { + + private static final long WARN_THRESHOLD = TimeUnit.MILLISECONDS.toNanos(150); + + // Only check every 2s to not flood the logs on a blocked thread. + // We mostly care about long blocks and not random slowness anyway and in tests would randomly catch slow operations that block for + // less than 2s eventually. + private static final TimeValue CHECK_INTERVAL = TimeValue.timeValueSeconds(2); + + private final ThreadPool threadPool; + private final ConcurrentHashMap registry = new ConcurrentHashMap<>(); + + private volatile boolean stopped; + + TransportThreadWatchdog(ThreadPool threadPool) { + this.threadPool = threadPool; + threadPool.schedule(this::logLongRunningExecutions, CHECK_INTERVAL, ThreadPool.Names.GENERIC); + } + + public boolean register() { + Long previousValue = registry.put(Thread.currentThread(), threadPool.relativeTimeInNanos()); + return previousValue == null; + } + + public void unregister() { + Long previousValue = registry.remove(Thread.currentThread()); + assert previousValue != null; + maybeLogElapsedTime(previousValue); + } + + private void maybeLogElapsedTime(long startTime) { + long elapsedTime = threadPool.relativeTimeInNanos() - startTime; + if (elapsedTime > WARN_THRESHOLD) { + logger.warn( + new ParameterizedMessage("Slow execution on network thread [{} milliseconds]", + TimeUnit.NANOSECONDS.toMillis(elapsedTime)), + new RuntimeException("Slow exception on network thread")); + } + } + + private void logLongRunningExecutions() { + for (Map.Entry entry : registry.entrySet()) { + final long elapsedTime = threadPool.relativeTimeInMillis() - entry.getValue(); + if (elapsedTime > WARN_THRESHOLD) { + final Thread thread = entry.getKey(); + logger.warn("Slow execution on network thread [{}] [{} milliseconds]: \n{}", thread.getName(), + TimeUnit.NANOSECONDS.toMillis(elapsedTime), + Arrays.stream(thread.getStackTrace()).map(Object::toString).collect(Collectors.joining("\n"))); + } + } + if (stopped == false) { + threadPool.scheduleUnlessShuttingDown(CHECK_INTERVAL, ThreadPool.Names.GENERIC, this::logLongRunningExecutions); + } + } + + public void stop() { + stopped = true; + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/TestEventHandler.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/TestEventHandler.java index a70ecb0c59e..069e19c3455 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/TestEventHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/TestEventHandler.java @@ -19,9 +19,6 @@ package org.elasticsearch.transport.nio; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.nio.ChannelContext; import org.elasticsearch.nio.EventHandler; import org.elasticsearch.nio.NioSelector; @@ -32,185 +29,202 @@ import java.io.IOException; import java.util.Collections; import java.util.Set; import java.util.WeakHashMap; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import java.util.function.LongSupplier; import java.util.function.Supplier; public class TestEventHandler extends EventHandler { - private static final Logger logger = LogManager.getLogger(TestEventHandler.class); - private final Set hasConnectedMap = Collections.newSetFromMap(new WeakHashMap<>()); private final Set hasConnectExceptionMap = Collections.newSetFromMap(new WeakHashMap<>()); - private final LongSupplier relativeNanosSupplier; + private final MockNioTransport.TransportThreadWatchdog transportThreadWatchdog; - TestEventHandler(Consumer exceptionHandler, Supplier selectorSupplier, LongSupplier relativeNanosSupplier) { + TestEventHandler(Consumer exceptionHandler, Supplier selectorSupplier, + MockNioTransport.TransportThreadWatchdog transportThreadWatchdog) { super(exceptionHandler, selectorSupplier); - this.relativeNanosSupplier = relativeNanosSupplier; + this.transportThreadWatchdog = transportThreadWatchdog; } @Override protected void acceptChannel(ServerChannelContext context) throws IOException { - long startTime = relativeNanosSupplier.getAsLong(); + final boolean registered = transportThreadWatchdog.register(); try { super.acceptChannel(context); } finally { - maybeLogElapsedTime(startTime); + if (registered) { + transportThreadWatchdog.unregister(); + } } } @Override protected void acceptException(ServerChannelContext context, Exception exception) { - long startTime = relativeNanosSupplier.getAsLong(); + final boolean registered = transportThreadWatchdog.register(); try { super.acceptException(context, exception); } finally { - maybeLogElapsedTime(startTime); + if (registered) { + transportThreadWatchdog.unregister(); + } } } @Override protected void handleRegistration(ChannelContext context) throws IOException { - long startTime = relativeNanosSupplier.getAsLong(); + final boolean registered = transportThreadWatchdog.register(); try { super.handleRegistration(context); } finally { - maybeLogElapsedTime(startTime); + if (registered) { + transportThreadWatchdog.unregister(); + } } } @Override protected void registrationException(ChannelContext context, Exception exception) { - long startTime = relativeNanosSupplier.getAsLong(); + final boolean registered = transportThreadWatchdog.register(); try { super.registrationException(context, exception); } finally { - maybeLogElapsedTime(startTime); + if (registered) { + transportThreadWatchdog.unregister(); + } } } public void handleConnect(SocketChannelContext context) throws IOException { assert hasConnectedMap.contains(context) == false : "handleConnect should only be called is a channel is not yet connected"; - long startTime = relativeNanosSupplier.getAsLong(); + final boolean registered = transportThreadWatchdog.register(); try { super.handleConnect(context); if (context.isConnectComplete()) { hasConnectedMap.add(context); } } finally { - maybeLogElapsedTime(startTime); + if (registered) { + transportThreadWatchdog.unregister(); + } } } public void connectException(SocketChannelContext context, Exception e) { assert hasConnectExceptionMap.contains(context) == false : "connectException should only called at maximum once per channel"; + final boolean registered = transportThreadWatchdog.register(); hasConnectExceptionMap.add(context); - long startTime = relativeNanosSupplier.getAsLong(); try { super.connectException(context, e); } finally { - maybeLogElapsedTime(startTime); + if (registered) { + transportThreadWatchdog.unregister(); + } } } @Override protected void handleRead(SocketChannelContext context) throws IOException { - long startTime = relativeNanosSupplier.getAsLong(); + final boolean registered = transportThreadWatchdog.register(); try { super.handleRead(context); } finally { - maybeLogElapsedTime(startTime); + if (registered) { + transportThreadWatchdog.unregister(); + } } } @Override protected void readException(SocketChannelContext context, Exception exception) { - long startTime = relativeNanosSupplier.getAsLong(); + final boolean registered = transportThreadWatchdog.register(); try { super.readException(context, exception); } finally { - maybeLogElapsedTime(startTime); + if (registered) { + transportThreadWatchdog.unregister(); + } } } @Override protected void handleWrite(SocketChannelContext context) throws IOException { - long startTime = relativeNanosSupplier.getAsLong(); + final boolean registered = transportThreadWatchdog.register(); try { super.handleWrite(context); } finally { - maybeLogElapsedTime(startTime); + if (registered) { + transportThreadWatchdog.unregister(); + } } } @Override protected void writeException(SocketChannelContext context, Exception exception) { - long startTime = relativeNanosSupplier.getAsLong(); + final boolean registered = transportThreadWatchdog.register(); try { super.writeException(context, exception); } finally { - maybeLogElapsedTime(startTime); + if (registered) { + transportThreadWatchdog.unregister(); + } } } @Override protected void handleTask(Runnable task) { - long startTime = relativeNanosSupplier.getAsLong(); + final boolean registered = transportThreadWatchdog.register(); try { super.handleTask(task); } finally { - maybeLogElapsedTime(startTime); + if (registered) { + transportThreadWatchdog.unregister(); + } } } @Override protected void taskException(Exception exception) { - long startTime = relativeNanosSupplier.getAsLong(); + final boolean registered = transportThreadWatchdog.register(); try { super.taskException(exception); } finally { - maybeLogElapsedTime(startTime); + if (registered) { + transportThreadWatchdog.unregister(); + } } } @Override protected void handleClose(ChannelContext context) throws IOException { - long startTime = relativeNanosSupplier.getAsLong(); + final boolean registered = transportThreadWatchdog.register(); try { super.handleClose(context); } finally { - maybeLogElapsedTime(startTime); + if (registered) { + transportThreadWatchdog.unregister(); + } } } @Override protected void closeException(ChannelContext context, Exception exception) { - long startTime = relativeNanosSupplier.getAsLong(); + final boolean registered = transportThreadWatchdog.register(); try { super.closeException(context, exception); } finally { - maybeLogElapsedTime(startTime); + if (registered) { + transportThreadWatchdog.unregister(); + } } } @Override protected void genericChannelException(ChannelContext context, Exception exception) { - long startTime = relativeNanosSupplier.getAsLong(); + final boolean registered = transportThreadWatchdog.register(); try { super.genericChannelException(context, exception); } finally { - maybeLogElapsedTime(startTime); - } - } - - private static final long WARN_THRESHOLD = 150; - - private void maybeLogElapsedTime(long startTime) { - long elapsedTime = TimeUnit.NANOSECONDS.toMillis(relativeNanosSupplier.getAsLong() - startTime); - if (elapsedTime > WARN_THRESHOLD) { - logger.warn(new ParameterizedMessage("Slow execution on network thread [{} milliseconds]", elapsedTime), - new RuntimeException("Slow exception on network thread")); + if (registered) { + transportThreadWatchdog.unregister(); + } } } } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/TestEventHandlerTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/TestEventHandlerTests.java index 2a570eb59b6..424d4922f02 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/TestEventHandlerTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/TestEventHandlerTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.nio.ServerChannelContext; import org.elasticsearch.nio.SocketChannelContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.threadpool.ThreadPool; import java.util.HashMap; import java.util.Map; @@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongSupplier; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; public class TestEventHandlerTests extends ESTestCase { @@ -43,12 +45,12 @@ public class TestEventHandlerTests extends ESTestCase { public void setUp() throws Exception { super.setUp(); appender = new MockLogAppender(); - Loggers.addAppender(LogManager.getLogger(TestEventHandler.class), appender); + Loggers.addAppender(LogManager.getLogger(MockNioTransport.class), appender); appender.start(); } public void tearDown() throws Exception { - Loggers.removeAppender(LogManager.getLogger(TestEventHandler.class), appender); + Loggers.removeAppender(LogManager.getLogger(MockNioTransport.class), appender); appender.stop(); super.tearDown(); } @@ -65,7 +67,10 @@ public class TestEventHandlerTests extends ESTestCase { } throw new IllegalStateException("Cannot update isStart"); }; - TestEventHandler eventHandler = new TestEventHandler((e) -> {}, () -> null, timeSupplier); + final ThreadPool threadPool = mock(ThreadPool.class); + doAnswer(i -> timeSupplier.getAsLong()).when(threadPool).relativeTimeInNanos(); + TestEventHandler eventHandler = + new TestEventHandler((e) -> {}, () -> null, new MockNioTransport.TransportThreadWatchdog(threadPool)); ServerChannelContext serverChannelContext = mock(ServerChannelContext.class); SocketChannelContext socketChannelContext = mock(SocketChannelContext.class); @@ -91,7 +96,7 @@ public class TestEventHandlerTests extends ESTestCase { for (Map.Entry> entry : tests.entrySet()) { String message = "*Slow execution on network thread*"; MockLogAppender.LoggingExpectation slowExpectation = - new MockLogAppender.SeenEventExpectation(entry.getKey(), TestEventHandler.class.getCanonicalName(), Level.WARN, message); + new MockLogAppender.SeenEventExpectation(entry.getKey(), MockNioTransport.class.getCanonicalName(), Level.WARN, message); appender.addExpectation(slowExpectation); entry.getValue().run(); appender.assertAllExpectationsMatched();