* Dump Stacktrace on Slow IO-Thread Operations * Follow up to #39729 extending the functionality to actually dump the stack when the thread is blocked not afterwards * Logging the stacktrace after the thread became unblocked is only of limited use because we don't know what happened in the slow callback from that (only whether we were blocked on a read,write,connect etc.) * Relates #41745
This commit is contained in:
parent
489616da62
commit
b68358945f
|
@ -31,6 +31,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
|||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.recycler.Recycler;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.nio.BytesChannelContext;
|
||||
|
@ -57,11 +58,16 @@ import java.net.StandardSocketOptions;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ServerSocketChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.IntFunction;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
|
||||
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
|
||||
|
@ -70,6 +76,7 @@ public class MockNioTransport extends TcpTransport {
|
|||
private static final Logger logger = LogManager.getLogger(MockNioTransport.class);
|
||||
|
||||
private final ConcurrentMap<String, MockTcpChannelFactory> profileToChannelFactory = newConcurrentMap();
|
||||
private final TransportThreadWatchdog transportThreadWatchdog;
|
||||
private volatile NioSelectorGroup nioGroup;
|
||||
private volatile MockTcpChannelFactory clientChannelFactory;
|
||||
|
||||
|
@ -77,6 +84,7 @@ public class MockNioTransport extends TcpTransport {
|
|||
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
|
||||
CircuitBreakerService circuitBreakerService) {
|
||||
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
|
||||
this.transportThreadWatchdog = new TransportThreadWatchdog(threadPool);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -96,7 +104,7 @@ public class MockNioTransport extends TcpTransport {
|
|||
boolean success = false;
|
||||
try {
|
||||
nioGroup = new NioSelectorGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2,
|
||||
(s) -> new TestEventHandler(this::onNonChannelException, s, System::nanoTime));
|
||||
(s) -> new TestEventHandler(this::onNonChannelException, s, transportThreadWatchdog));
|
||||
|
||||
ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default");
|
||||
clientChannelFactory = new MockTcpChannelFactory(true, clientProfileSettings, "client");
|
||||
|
@ -125,6 +133,7 @@ public class MockNioTransport extends TcpTransport {
|
|||
@Override
|
||||
protected void stopInternal() {
|
||||
try {
|
||||
transportThreadWatchdog.stop();
|
||||
nioGroup.close();
|
||||
} catch (Exception e) {
|
||||
logger.warn("unexpected exception while stopping nio group", e);
|
||||
|
@ -319,4 +328,64 @@ public class MockNioTransport extends TcpTransport {
|
|||
getContext().sendMessage(BytesReference.toByteBuffers(reference), ActionListener.toBiConsumer(listener));
|
||||
}
|
||||
}
|
||||
|
||||
static final class TransportThreadWatchdog {
|
||||
|
||||
private static final long WARN_THRESHOLD = TimeUnit.MILLISECONDS.toNanos(150);
|
||||
|
||||
// Only check every 2s to not flood the logs on a blocked thread.
|
||||
// We mostly care about long blocks and not random slowness anyway and in tests would randomly catch slow operations that block for
|
||||
// less than 2s eventually.
|
||||
private static final TimeValue CHECK_INTERVAL = TimeValue.timeValueSeconds(2);
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
private final ConcurrentHashMap<Thread, Long> registry = new ConcurrentHashMap<>();
|
||||
|
||||
private volatile boolean stopped;
|
||||
|
||||
TransportThreadWatchdog(ThreadPool threadPool) {
|
||||
this.threadPool = threadPool;
|
||||
threadPool.schedule(this::logLongRunningExecutions, CHECK_INTERVAL, ThreadPool.Names.GENERIC);
|
||||
}
|
||||
|
||||
public boolean register() {
|
||||
Long previousValue = registry.put(Thread.currentThread(), threadPool.relativeTimeInNanos());
|
||||
return previousValue == null;
|
||||
}
|
||||
|
||||
public void unregister() {
|
||||
Long previousValue = registry.remove(Thread.currentThread());
|
||||
assert previousValue != null;
|
||||
maybeLogElapsedTime(previousValue);
|
||||
}
|
||||
|
||||
private void maybeLogElapsedTime(long startTime) {
|
||||
long elapsedTime = threadPool.relativeTimeInNanos() - startTime;
|
||||
if (elapsedTime > WARN_THRESHOLD) {
|
||||
logger.warn(
|
||||
new ParameterizedMessage("Slow execution on network thread [{} milliseconds]",
|
||||
TimeUnit.NANOSECONDS.toMillis(elapsedTime)),
|
||||
new RuntimeException("Slow exception on network thread"));
|
||||
}
|
||||
}
|
||||
|
||||
private void logLongRunningExecutions() {
|
||||
for (Map.Entry<Thread, Long> entry : registry.entrySet()) {
|
||||
final long elapsedTime = threadPool.relativeTimeInMillis() - entry.getValue();
|
||||
if (elapsedTime > WARN_THRESHOLD) {
|
||||
final Thread thread = entry.getKey();
|
||||
logger.warn("Slow execution on network thread [{}] [{} milliseconds]: \n{}", thread.getName(),
|
||||
TimeUnit.NANOSECONDS.toMillis(elapsedTime),
|
||||
Arrays.stream(thread.getStackTrace()).map(Object::toString).collect(Collectors.joining("\n")));
|
||||
}
|
||||
}
|
||||
if (stopped == false) {
|
||||
threadPool.scheduleUnlessShuttingDown(CHECK_INTERVAL, ThreadPool.Names.GENERIC, this::logLongRunningExecutions);
|
||||
}
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
stopped = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,9 +19,6 @@
|
|||
|
||||
package org.elasticsearch.transport.nio;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.nio.ChannelContext;
|
||||
import org.elasticsearch.nio.EventHandler;
|
||||
import org.elasticsearch.nio.NioSelector;
|
||||
|
@ -32,185 +29,202 @@ import java.io.IOException;
|
|||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
import java.util.WeakHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.LongSupplier;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class TestEventHandler extends EventHandler {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(TestEventHandler.class);
|
||||
|
||||
private final Set<SocketChannelContext> hasConnectedMap = Collections.newSetFromMap(new WeakHashMap<>());
|
||||
private final Set<SocketChannelContext> hasConnectExceptionMap = Collections.newSetFromMap(new WeakHashMap<>());
|
||||
private final LongSupplier relativeNanosSupplier;
|
||||
private final MockNioTransport.TransportThreadWatchdog transportThreadWatchdog;
|
||||
|
||||
TestEventHandler(Consumer<Exception> exceptionHandler, Supplier<NioSelector> selectorSupplier, LongSupplier relativeNanosSupplier) {
|
||||
TestEventHandler(Consumer<Exception> exceptionHandler, Supplier<NioSelector> selectorSupplier,
|
||||
MockNioTransport.TransportThreadWatchdog transportThreadWatchdog) {
|
||||
super(exceptionHandler, selectorSupplier);
|
||||
this.relativeNanosSupplier = relativeNanosSupplier;
|
||||
this.transportThreadWatchdog = transportThreadWatchdog;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void acceptChannel(ServerChannelContext context) throws IOException {
|
||||
long startTime = relativeNanosSupplier.getAsLong();
|
||||
final boolean registered = transportThreadWatchdog.register();
|
||||
try {
|
||||
super.acceptChannel(context);
|
||||
} finally {
|
||||
maybeLogElapsedTime(startTime);
|
||||
if (registered) {
|
||||
transportThreadWatchdog.unregister();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void acceptException(ServerChannelContext context, Exception exception) {
|
||||
long startTime = relativeNanosSupplier.getAsLong();
|
||||
final boolean registered = transportThreadWatchdog.register();
|
||||
try {
|
||||
super.acceptException(context, exception);
|
||||
} finally {
|
||||
maybeLogElapsedTime(startTime);
|
||||
if (registered) {
|
||||
transportThreadWatchdog.unregister();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handleRegistration(ChannelContext<?> context) throws IOException {
|
||||
long startTime = relativeNanosSupplier.getAsLong();
|
||||
final boolean registered = transportThreadWatchdog.register();
|
||||
try {
|
||||
super.handleRegistration(context);
|
||||
} finally {
|
||||
maybeLogElapsedTime(startTime);
|
||||
if (registered) {
|
||||
transportThreadWatchdog.unregister();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void registrationException(ChannelContext<?> context, Exception exception) {
|
||||
long startTime = relativeNanosSupplier.getAsLong();
|
||||
final boolean registered = transportThreadWatchdog.register();
|
||||
try {
|
||||
super.registrationException(context, exception);
|
||||
} finally {
|
||||
maybeLogElapsedTime(startTime);
|
||||
if (registered) {
|
||||
transportThreadWatchdog.unregister();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void handleConnect(SocketChannelContext context) throws IOException {
|
||||
assert hasConnectedMap.contains(context) == false : "handleConnect should only be called is a channel is not yet connected";
|
||||
long startTime = relativeNanosSupplier.getAsLong();
|
||||
final boolean registered = transportThreadWatchdog.register();
|
||||
try {
|
||||
super.handleConnect(context);
|
||||
if (context.isConnectComplete()) {
|
||||
hasConnectedMap.add(context);
|
||||
}
|
||||
} finally {
|
||||
maybeLogElapsedTime(startTime);
|
||||
if (registered) {
|
||||
transportThreadWatchdog.unregister();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void connectException(SocketChannelContext context, Exception e) {
|
||||
assert hasConnectExceptionMap.contains(context) == false : "connectException should only called at maximum once per channel";
|
||||
final boolean registered = transportThreadWatchdog.register();
|
||||
hasConnectExceptionMap.add(context);
|
||||
long startTime = relativeNanosSupplier.getAsLong();
|
||||
try {
|
||||
super.connectException(context, e);
|
||||
} finally {
|
||||
maybeLogElapsedTime(startTime);
|
||||
if (registered) {
|
||||
transportThreadWatchdog.unregister();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handleRead(SocketChannelContext context) throws IOException {
|
||||
long startTime = relativeNanosSupplier.getAsLong();
|
||||
final boolean registered = transportThreadWatchdog.register();
|
||||
try {
|
||||
super.handleRead(context);
|
||||
} finally {
|
||||
maybeLogElapsedTime(startTime);
|
||||
if (registered) {
|
||||
transportThreadWatchdog.unregister();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void readException(SocketChannelContext context, Exception exception) {
|
||||
long startTime = relativeNanosSupplier.getAsLong();
|
||||
final boolean registered = transportThreadWatchdog.register();
|
||||
try {
|
||||
super.readException(context, exception);
|
||||
} finally {
|
||||
maybeLogElapsedTime(startTime);
|
||||
if (registered) {
|
||||
transportThreadWatchdog.unregister();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handleWrite(SocketChannelContext context) throws IOException {
|
||||
long startTime = relativeNanosSupplier.getAsLong();
|
||||
final boolean registered = transportThreadWatchdog.register();
|
||||
try {
|
||||
super.handleWrite(context);
|
||||
} finally {
|
||||
maybeLogElapsedTime(startTime);
|
||||
if (registered) {
|
||||
transportThreadWatchdog.unregister();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeException(SocketChannelContext context, Exception exception) {
|
||||
long startTime = relativeNanosSupplier.getAsLong();
|
||||
final boolean registered = transportThreadWatchdog.register();
|
||||
try {
|
||||
super.writeException(context, exception);
|
||||
} finally {
|
||||
maybeLogElapsedTime(startTime);
|
||||
if (registered) {
|
||||
transportThreadWatchdog.unregister();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handleTask(Runnable task) {
|
||||
long startTime = relativeNanosSupplier.getAsLong();
|
||||
final boolean registered = transportThreadWatchdog.register();
|
||||
try {
|
||||
super.handleTask(task);
|
||||
} finally {
|
||||
maybeLogElapsedTime(startTime);
|
||||
if (registered) {
|
||||
transportThreadWatchdog.unregister();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void taskException(Exception exception) {
|
||||
long startTime = relativeNanosSupplier.getAsLong();
|
||||
final boolean registered = transportThreadWatchdog.register();
|
||||
try {
|
||||
super.taskException(exception);
|
||||
} finally {
|
||||
maybeLogElapsedTime(startTime);
|
||||
if (registered) {
|
||||
transportThreadWatchdog.unregister();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handleClose(ChannelContext<?> context) throws IOException {
|
||||
long startTime = relativeNanosSupplier.getAsLong();
|
||||
final boolean registered = transportThreadWatchdog.register();
|
||||
try {
|
||||
super.handleClose(context);
|
||||
} finally {
|
||||
maybeLogElapsedTime(startTime);
|
||||
if (registered) {
|
||||
transportThreadWatchdog.unregister();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void closeException(ChannelContext<?> context, Exception exception) {
|
||||
long startTime = relativeNanosSupplier.getAsLong();
|
||||
final boolean registered = transportThreadWatchdog.register();
|
||||
try {
|
||||
super.closeException(context, exception);
|
||||
} finally {
|
||||
maybeLogElapsedTime(startTime);
|
||||
if (registered) {
|
||||
transportThreadWatchdog.unregister();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void genericChannelException(ChannelContext<?> context, Exception exception) {
|
||||
long startTime = relativeNanosSupplier.getAsLong();
|
||||
final boolean registered = transportThreadWatchdog.register();
|
||||
try {
|
||||
super.genericChannelException(context, exception);
|
||||
} finally {
|
||||
maybeLogElapsedTime(startTime);
|
||||
}
|
||||
}
|
||||
|
||||
private static final long WARN_THRESHOLD = 150;
|
||||
|
||||
private void maybeLogElapsedTime(long startTime) {
|
||||
long elapsedTime = TimeUnit.NANOSECONDS.toMillis(relativeNanosSupplier.getAsLong() - startTime);
|
||||
if (elapsedTime > WARN_THRESHOLD) {
|
||||
logger.warn(new ParameterizedMessage("Slow execution on network thread [{} milliseconds]", elapsedTime),
|
||||
new RuntimeException("Slow exception on network thread"));
|
||||
if (registered) {
|
||||
transportThreadWatchdog.unregister();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.nio.ServerChannelContext;
|
|||
import org.elasticsearch.nio.SocketChannelContext;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.MockLogAppender;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class TestEventHandlerTests extends ESTestCase {
|
||||
|
@ -43,12 +45,12 @@ public class TestEventHandlerTests extends ESTestCase {
|
|||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
appender = new MockLogAppender();
|
||||
Loggers.addAppender(LogManager.getLogger(TestEventHandler.class), appender);
|
||||
Loggers.addAppender(LogManager.getLogger(MockNioTransport.class), appender);
|
||||
appender.start();
|
||||
}
|
||||
|
||||
public void tearDown() throws Exception {
|
||||
Loggers.removeAppender(LogManager.getLogger(TestEventHandler.class), appender);
|
||||
Loggers.removeAppender(LogManager.getLogger(MockNioTransport.class), appender);
|
||||
appender.stop();
|
||||
super.tearDown();
|
||||
}
|
||||
|
@ -65,7 +67,10 @@ public class TestEventHandlerTests extends ESTestCase {
|
|||
}
|
||||
throw new IllegalStateException("Cannot update isStart");
|
||||
};
|
||||
TestEventHandler eventHandler = new TestEventHandler((e) -> {}, () -> null, timeSupplier);
|
||||
final ThreadPool threadPool = mock(ThreadPool.class);
|
||||
doAnswer(i -> timeSupplier.getAsLong()).when(threadPool).relativeTimeInNanos();
|
||||
TestEventHandler eventHandler =
|
||||
new TestEventHandler((e) -> {}, () -> null, new MockNioTransport.TransportThreadWatchdog(threadPool));
|
||||
|
||||
ServerChannelContext serverChannelContext = mock(ServerChannelContext.class);
|
||||
SocketChannelContext socketChannelContext = mock(SocketChannelContext.class);
|
||||
|
@ -91,7 +96,7 @@ public class TestEventHandlerTests extends ESTestCase {
|
|||
for (Map.Entry<String, CheckedRunnable<Exception>> entry : tests.entrySet()) {
|
||||
String message = "*Slow execution on network thread*";
|
||||
MockLogAppender.LoggingExpectation slowExpectation =
|
||||
new MockLogAppender.SeenEventExpectation(entry.getKey(), TestEventHandler.class.getCanonicalName(), Level.WARN, message);
|
||||
new MockLogAppender.SeenEventExpectation(entry.getKey(), MockNioTransport.class.getCanonicalName(), Level.WARN, message);
|
||||
appender.addExpectation(slowExpectation);
|
||||
entry.getValue().run();
|
||||
appender.assertAllExpectationsMatched();
|
||||
|
|
Loading…
Reference in New Issue