Avoid leaks in Long GC disruption tests

We can leak disrupted threads here since we never wait for them to
complete after freeing them from their loops. This commit addresses this
by joining on disrupted threads, and addresses fallout from trying to
join here.

Relates #24338
This commit is contained in:
Jason Tedor 2017-04-26 15:26:36 -04:00 committed by GitHub
parent b7bf651738
commit 2ed1f7a339
2 changed files with 57 additions and 18 deletions

View File

@ -106,6 +106,15 @@ public class LongGCDisruption extends SingleNodeDisruption {
logger.warn("failed to stop node [{}]'s threads within [{}] millis. Stopping thread stack trace:\n {}" logger.warn("failed to stop node [{}]'s threads within [{}] millis. Stopping thread stack trace:\n {}"
, disruptedNode, getStoppingTimeoutInMillis(), stackTrace(stoppingThread.getStackTrace())); , disruptedNode, getStoppingTimeoutInMillis(), stackTrace(stoppingThread.getStackTrace()));
stoppingThread.interrupt(); // best effort; stoppingThread.interrupt(); // best effort;
try {
/*
* We need to join on the stopping thread in case it has stopped a thread that is in a critical section and needs to
* be resumed.
*/
stoppingThread.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
throw new RuntimeException("stopping node threads took too long"); throw new RuntimeException("stopping node threads took too long");
} }
// block detection checks if other threads are blocked waiting on an object that is held by one // block detection checks if other threads are blocked waiting on an object that is held by one
@ -228,23 +237,39 @@ public class LongGCDisruption extends SingleNodeDisruption {
if (thread.isAlive() && nodeThreads.add(thread)) { if (thread.isAlive() && nodeThreads.add(thread)) {
liveThreadsFound = true; liveThreadsFound = true;
logger.trace("stopping thread [{}]", threadName); logger.trace("stopping thread [{}]", threadName);
thread.suspend(); // we assume it is not safe to suspend the thread
// double check the thread is not in a shared resource like logging. If so, let it go and come back.. boolean safe = false;
boolean safe = true; try {
safe: /*
for (StackTraceElement stackElement : thread.getStackTrace()) { * At the bottom of this try-block we will know whether or not it is safe to suspend this thread; we start by
String className = stackElement.getClassName(); * assuming that it is safe.
for (Pattern unsafePattern : getUnsafeClasses()) { */
if (unsafePattern.matcher(className).find()) { boolean definitelySafe = true;
safe = false; thread.suspend();
break safe; // double check the thread is not in a shared resource like logging; if so, let it go and come back
safe:
for (StackTraceElement stackElement : thread.getStackTrace()) {
String className = stackElement.getClassName();
for (Pattern unsafePattern : getUnsafeClasses()) {
if (unsafePattern.matcher(className).find()) {
// it is definitely not safe to suspend the thread
definitelySafe = false;
break safe;
}
} }
} }
} safe = definitelySafe;
if (!safe) { } finally {
logger.trace("resuming thread [{}] as it is in a critical section", threadName); if (!safe) {
thread.resume(); /*
nodeThreads.remove(thread); * Do not log before resuming as we might be interrupted while logging in which case we will throw an
* interrupted exception and never resume the stopped thread that is in a critical section. Also, logging before
* resuming makes for confusing log messages if we never hit the resume.
*/
thread.resume();
logger.trace("resumed thread [{}] as it is in a critical section", threadName);
nodeThreads.remove(thread);
}
} }
} }
} }

View File

@ -22,6 +22,8 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.lang.management.ThreadInfo; import java.lang.management.ThreadInfo;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -69,8 +71,8 @@ public class LongGCDisruptionTests extends ESTestCase {
final CountDownLatch pauseUnderLock = new CountDownLatch(1); final CountDownLatch pauseUnderLock = new CountDownLatch(1);
final LockedExecutor lockedExecutor = new LockedExecutor(); final LockedExecutor lockedExecutor = new LockedExecutor();
final AtomicLong ops = new AtomicLong(); final AtomicLong ops = new AtomicLong();
final Thread[] threads = new Thread[10];
try { try {
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
// at least one locked and one none lock thread // at least one locked and one none lock thread
final boolean lockedExec = (i < 9 && randomBoolean()) || i == 0; final boolean lockedExec = (i < 9 && randomBoolean()) || i == 0;
@ -101,6 +103,9 @@ public class LongGCDisruptionTests extends ESTestCase {
} finally { } finally {
stop.set(true); stop.set(true);
pauseUnderLock.countDown(); pauseUnderLock.countDown();
for (final Thread thread : threads) {
thread.join();
}
} }
} }
@ -121,8 +126,8 @@ public class LongGCDisruptionTests extends ESTestCase {
final AtomicBoolean stop = new AtomicBoolean(); final AtomicBoolean stop = new AtomicBoolean();
final LockedExecutor lockedExecutor = new LockedExecutor(); final LockedExecutor lockedExecutor = new LockedExecutor();
final AtomicLong ops = new AtomicLong(); final AtomicLong ops = new AtomicLong();
final Thread[] threads = new Thread[10];
try { try {
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> { threads[i] = new Thread(() -> {
for (int iter = 0; stop.get() == false; iter++) { for (int iter = 0; stop.get() == false; iter++) {
@ -150,6 +155,9 @@ public class LongGCDisruptionTests extends ESTestCase {
assertBusy(() -> assertThat(ops.get(), greaterThan(first))); assertBusy(() -> assertThat(ops.get(), greaterThan(first)));
} finally { } finally {
stop.set(true); stop.set(true);
for (final Thread thread : threads) {
thread.join();
}
} }
} }
@ -183,6 +191,7 @@ public class LongGCDisruptionTests extends ESTestCase {
final CountDownLatch pauseUnderLock = new CountDownLatch(1); final CountDownLatch pauseUnderLock = new CountDownLatch(1);
final LockedExecutor lockedExecutor = new LockedExecutor(); final LockedExecutor lockedExecutor = new LockedExecutor();
final AtomicLong ops = new AtomicLong(); final AtomicLong ops = new AtomicLong();
final List<Thread> threads = new ArrayList<>();
try { try {
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
// at least one locked and one none lock thread // at least one locked and one none lock thread
@ -206,6 +215,7 @@ public class LongGCDisruptionTests extends ESTestCase {
}); });
thread.setName("[" + disruptedNodeName + "][" + i + "]"); thread.setName("[" + disruptedNodeName + "][" + i + "]");
threads.add(thread);
thread.start(); thread.start();
} }
@ -224,12 +234,13 @@ public class LongGCDisruptionTests extends ESTestCase {
} }
}); });
thread.setName("[" + blockedNodeName + "][" + i + "]"); thread.setName("[" + blockedNodeName + "][" + i + "]");
threads.add(thread);
thread.start(); thread.start();
} }
// make sure some threads of test_node are under lock // make sure some threads of test_node are under lock
underLock.await(); underLock.await();
disruption.startDisrupting(); disruption.startDisrupting();
waitForBlockDetectionResult.await(30, TimeUnit.SECONDS); assertTrue(waitForBlockDetectionResult.await(30, TimeUnit.SECONDS));
disruption.stopDisrupting(); disruption.stopDisrupting();
ThreadInfo threadInfo = blockDetectionResult.get(); ThreadInfo threadInfo = blockDetectionResult.get();
@ -240,6 +251,9 @@ public class LongGCDisruptionTests extends ESTestCase {
} finally { } finally {
stop.set(true); stop.set(true);
pauseUnderLock.countDown(); pauseUnderLock.countDown();
for (final Thread thread : threads) {
thread.join();
}
} }
} }
} }