diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/LongGCDisruption.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/LongGCDisruption.java index 98349086df5..45acde09325 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/LongGCDisruption.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/LongGCDisruption.java @@ -106,6 +106,15 @@ public class LongGCDisruption extends SingleNodeDisruption { logger.warn("failed to stop node [{}]'s threads within [{}] millis. Stopping thread stack trace:\n {}" , disruptedNode, getStoppingTimeoutInMillis(), stackTrace(stoppingThread.getStackTrace())); stoppingThread.interrupt(); // best effort; + try { + /* + * We need to join on the stopping thread in case it has stopped a thread that is in a critical section and needs to + * be resumed. + */ + stoppingThread.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } throw new RuntimeException("stopping node threads took too long"); } // block detection checks if other threads are blocked waiting on an object that is held by one @@ -228,23 +237,39 @@ public class LongGCDisruption extends SingleNodeDisruption { if (thread.isAlive() && nodeThreads.add(thread)) { liveThreadsFound = true; logger.trace("stopping thread [{}]", threadName); - thread.suspend(); - // double check the thread is not in a shared resource like logging. If so, let it go and come back.. - boolean safe = true; - safe: - for (StackTraceElement stackElement : thread.getStackTrace()) { - String className = stackElement.getClassName(); - for (Pattern unsafePattern : getUnsafeClasses()) { - if (unsafePattern.matcher(className).find()) { - safe = false; - break safe; + // we assume it is not safe to suspend the thread + boolean safe = false; + try { + /* + * At the bottom of this try-block we will know whether or not it is safe to suspend this thread; we start by + * assuming that it is safe. + */ + boolean definitelySafe = true; + thread.suspend(); + // double check the thread is not in a shared resource like logging; if so, let it go and come back + safe: + for (StackTraceElement stackElement : thread.getStackTrace()) { + String className = stackElement.getClassName(); + for (Pattern unsafePattern : getUnsafeClasses()) { + if (unsafePattern.matcher(className).find()) { + // it is definitely not safe to suspend the thread + definitelySafe = false; + break safe; + } } } - } - if (!safe) { - logger.trace("resuming thread [{}] as it is in a critical section", threadName); - thread.resume(); - nodeThreads.remove(thread); + safe = definitelySafe; + } finally { + if (!safe) { + /* + * Do not log before resuming as we might be interrupted while logging in which case we will throw an + * interrupted exception and never resume the stopped thread that is in a critical section. Also, logging before + * resuming makes for confusing log messages if we never hit the resume. + */ + thread.resume(); + logger.trace("resumed thread [{}] as it is in a critical section", threadName); + nodeThreads.remove(thread); + } } } } diff --git a/test/framework/src/test/java/org/elasticsearch/test/disruption/LongGCDisruptionTests.java b/test/framework/src/test/java/org/elasticsearch/test/disruption/LongGCDisruptionTests.java index 48bd18986c2..147a6df608a 100644 --- a/test/framework/src/test/java/org/elasticsearch/test/disruption/LongGCDisruptionTests.java +++ b/test/framework/src/test/java/org/elasticsearch/test/disruption/LongGCDisruptionTests.java @@ -22,6 +22,8 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.test.ESTestCase; import java.lang.management.ThreadInfo; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -69,8 +71,8 @@ public class LongGCDisruptionTests extends ESTestCase { final CountDownLatch pauseUnderLock = new CountDownLatch(1); final LockedExecutor lockedExecutor = new LockedExecutor(); final AtomicLong ops = new AtomicLong(); + final Thread[] threads = new Thread[10]; try { - Thread[] threads = new Thread[10]; for (int i = 0; i < 10; i++) { // at least one locked and one none lock thread final boolean lockedExec = (i < 9 && randomBoolean()) || i == 0; @@ -101,6 +103,9 @@ public class LongGCDisruptionTests extends ESTestCase { } finally { stop.set(true); pauseUnderLock.countDown(); + for (final Thread thread : threads) { + thread.join(); + } } } @@ -121,8 +126,8 @@ public class LongGCDisruptionTests extends ESTestCase { final AtomicBoolean stop = new AtomicBoolean(); final LockedExecutor lockedExecutor = new LockedExecutor(); final AtomicLong ops = new AtomicLong(); + final Thread[] threads = new Thread[10]; try { - Thread[] threads = new Thread[10]; for (int i = 0; i < 10; i++) { threads[i] = new Thread(() -> { for (int iter = 0; stop.get() == false; iter++) { @@ -150,6 +155,9 @@ public class LongGCDisruptionTests extends ESTestCase { assertBusy(() -> assertThat(ops.get(), greaterThan(first))); } finally { stop.set(true); + for (final Thread thread : threads) { + thread.join(); + } } } @@ -183,6 +191,7 @@ public class LongGCDisruptionTests extends ESTestCase { final CountDownLatch pauseUnderLock = new CountDownLatch(1); final LockedExecutor lockedExecutor = new LockedExecutor(); final AtomicLong ops = new AtomicLong(); + final List threads = new ArrayList<>(); try { for (int i = 0; i < 5; i++) { // at least one locked and one none lock thread @@ -206,6 +215,7 @@ public class LongGCDisruptionTests extends ESTestCase { }); thread.setName("[" + disruptedNodeName + "][" + i + "]"); + threads.add(thread); thread.start(); } @@ -224,12 +234,13 @@ public class LongGCDisruptionTests extends ESTestCase { } }); thread.setName("[" + blockedNodeName + "][" + i + "]"); + threads.add(thread); thread.start(); } // make sure some threads of test_node are under lock underLock.await(); disruption.startDisrupting(); - waitForBlockDetectionResult.await(30, TimeUnit.SECONDS); + assertTrue(waitForBlockDetectionResult.await(30, TimeUnit.SECONDS)); disruption.stopDisrupting(); ThreadInfo threadInfo = blockDetectionResult.get(); @@ -240,6 +251,9 @@ public class LongGCDisruptionTests extends ESTestCase { } finally { stop.set(true); pauseUnderLock.countDown(); + for (final Thread thread : threads) { + thread.join(); + } } } }