mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-27 18:38:41 +00:00
Fix inconsistencies in long GC disruption
This commit fixes some inconsistencies in long GC disruption where we mixed stopping and suspending when the action we are performing on threads is suspending which is distinct from stopping a thread.
This commit is contained in:
parent
7f9d84cb1a
commit
74acc594a9
@ -78,7 +78,7 @@ public class IntermittentLongGCDisruption extends LongGCDisruption {
|
||||
logger.info("node [{}] goes into GC for for [{}]", disruptedNode, duration);
|
||||
final Set<Thread> nodeThreads = new HashSet<>();
|
||||
try {
|
||||
while (stopNodeThreads(nodeThreads)) ;
|
||||
while (suspendThreads(nodeThreads)) ;
|
||||
if (!nodeThreads.isEmpty()) {
|
||||
Thread.sleep(duration.millis());
|
||||
}
|
||||
|
@ -42,9 +42,9 @@ import java.util.stream.Collectors;
|
||||
public class LongGCDisruption extends SingleNodeDisruption {
|
||||
|
||||
private static final Pattern[] unsafeClasses = new Pattern[]{
|
||||
// logging has shared JVM locks - we may suspend a thread and block other nodes from doing their thing
|
||||
// logging has shared JVM locks; we may suspend a thread and block other nodes from doing their thing
|
||||
Pattern.compile("logging\\.log4j"),
|
||||
// security manager is shared across all nodes AND it uses synced hashmaps interanlly
|
||||
// security manager is shared across all nodes and it uses synchronized maps internally
|
||||
Pattern.compile("java\\.lang\\.SecurityManager"),
|
||||
// SecureRandom instance from SecureRandomHolder class is shared by all nodes
|
||||
Pattern.compile("java\\.security\\.SecureRandom")
|
||||
@ -74,48 +74,48 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
||||
// we spawn a background thread to protect against deadlock which can happen
|
||||
// if there are shared resources between caller thread and and suspended threads
|
||||
// see unsafeClasses to how to avoid that
|
||||
final AtomicReference<Exception> stoppingError = new AtomicReference<>();
|
||||
final Thread stoppingThread = new Thread(new AbstractRunnable() {
|
||||
final AtomicReference<Exception> suspendingError = new AtomicReference<>();
|
||||
final Thread suspendingThread = new Thread(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
stoppingError.set(e);
|
||||
suspendingError.set(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
// keep trying to stop threads, until no new threads are discovered.
|
||||
while (stopNodeThreads(suspendedThreads)) {
|
||||
// keep trying to suspend threads, until no new threads are discovered.
|
||||
while (suspendThreads(suspendedThreads)) {
|
||||
if (Thread.interrupted()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
stoppingThread.setName(currentThreadName + "[LongGCDisruption][threadStopper]");
|
||||
stoppingThread.start();
|
||||
suspendingThread.setName(currentThreadName + "[LongGCDisruption][threadSuspender]");
|
||||
suspendingThread.start();
|
||||
try {
|
||||
stoppingThread.join(getStoppingTimeoutInMillis());
|
||||
suspendingThread.join(getSuspendingTimeoutInMillis());
|
||||
} catch (InterruptedException e) {
|
||||
stoppingThread.interrupt(); // best effort to signal stopping
|
||||
suspendingThread.interrupt(); // best effort to signal suspending
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
if (stoppingError.get() != null) {
|
||||
throw new RuntimeException("unknown error while stopping threads", stoppingError.get());
|
||||
if (suspendingError.get() != null) {
|
||||
throw new RuntimeException("unknown error while suspending threads", suspendingError.get());
|
||||
}
|
||||
if (stoppingThread.isAlive()) {
|
||||
logger.warn("failed to stop node [{}]'s threads within [{}] millis. Stopping thread stack trace:\n {}"
|
||||
, disruptedNode, getStoppingTimeoutInMillis(), stackTrace(stoppingThread.getStackTrace()));
|
||||
stoppingThread.interrupt(); // best effort;
|
||||
if (suspendingThread.isAlive()) {
|
||||
logger.warn("failed to suspend node [{}]'s threads within [{}] millis. Suspending thread stack trace:\n {}"
|
||||
, disruptedNode, getSuspendingTimeoutInMillis(), stackTrace(suspendingThread.getStackTrace()));
|
||||
suspendingThread.interrupt(); // best effort;
|
||||
try {
|
||||
/*
|
||||
* We need to join on the stopping thread in case it has stopped a thread that is in a critical section and needs to
|
||||
* be resumed.
|
||||
* We need to join on the suspending thread in case it has suspended a thread that is in a critical section and
|
||||
* needs to be resumed.
|
||||
*/
|
||||
stoppingThread.join();
|
||||
suspendingThread.join();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
throw new RuntimeException("stopping node threads took too long");
|
||||
throw new RuntimeException("suspending node threads took too long");
|
||||
}
|
||||
// block detection checks if other threads are blocked waiting on an object that is held by one
|
||||
// of the threads that was suspended
|
||||
@ -190,7 +190,7 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
||||
if (blockDetectionThread != null) {
|
||||
try {
|
||||
blockDetectionThread.interrupt(); // best effort
|
||||
blockDetectionThread.join(getStoppingTimeoutInMillis());
|
||||
blockDetectionThread.join(getSuspendingTimeoutInMillis());
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@ -216,9 +216,9 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
||||
* returns true if some live threads were found. The caller is expected to call this method
|
||||
* until no more "live" are found.
|
||||
*/
|
||||
@SuppressWarnings("deprecation") // stops/resumes threads intentionally
|
||||
@SuppressForbidden(reason = "stops/resumes threads intentionally")
|
||||
protected boolean stopNodeThreads(Set<Thread> nodeThreads) {
|
||||
@SuppressWarnings("deprecation") // suspends/resumes threads intentionally
|
||||
@SuppressForbidden(reason = "suspends/resumes threads intentionally")
|
||||
protected boolean suspendThreads(Set<Thread> nodeThreads) {
|
||||
Thread[] allThreads = null;
|
||||
while (allThreads == null) {
|
||||
allThreads = new Thread[Thread.activeCount()];
|
||||
@ -236,12 +236,12 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
||||
if (isDisruptedNodeThread(threadName)) {
|
||||
if (thread.isAlive() && nodeThreads.add(thread)) {
|
||||
liveThreadsFound = true;
|
||||
logger.trace("stopping thread [{}]", threadName);
|
||||
logger.trace("suspending thread [{}]", threadName);
|
||||
// we assume it is not safe to suspend the thread
|
||||
boolean safe = false;
|
||||
try {
|
||||
/*
|
||||
* At the bottom of this try-block we will know whether or not it is safe to suspend this thread; we start by
|
||||
* At the bottom of this try-block we will know whether or not it is safe to suspend the thread; we start by
|
||||
* assuming that it is safe.
|
||||
*/
|
||||
boolean definitelySafe = true;
|
||||
@ -263,8 +263,8 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
||||
if (!safe) {
|
||||
/*
|
||||
* Do not log before resuming as we might be interrupted while logging in which case we will throw an
|
||||
* interrupted exception and never resume the stopped thread that is in a critical section. Also, logging before
|
||||
* resuming makes for confusing log messages if we never hit the resume.
|
||||
* interrupted exception and never resume the suspended thread that is in a critical section. Also, logging
|
||||
* before resuming makes for confusing log messages if we never hit the resume.
|
||||
*/
|
||||
thread.resume();
|
||||
logger.trace("resumed thread [{}] as it is in a critical section", threadName);
|
||||
@ -283,7 +283,7 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
||||
}
|
||||
|
||||
// for testing
|
||||
protected long getStoppingTimeoutInMillis() {
|
||||
protected long getSuspendingTimeoutInMillis() {
|
||||
return TimeValue.timeValueSeconds(30).getMillis();
|
||||
}
|
||||
|
||||
@ -309,8 +309,8 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
||||
"Stack trace of blocking thread: " + blockingThreadStackTrace);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation") // stops/resumes threads intentionally
|
||||
@SuppressForbidden(reason = "stops/resumes threads intentionally")
|
||||
@SuppressWarnings("deprecation") // suspends/resumes threads intentionally
|
||||
@SuppressForbidden(reason = "suspends/resumes threads intentionally")
|
||||
protected void resumeThreads(Set<Thread> threads) {
|
||||
for (Thread thread : threads) {
|
||||
thread.resume();
|
||||
|
@ -62,7 +62,7 @@ public class LongGCDisruptionTests extends ESTestCase {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long getStoppingTimeoutInMillis() {
|
||||
protected long getSuspendingTimeoutInMillis() {
|
||||
return 100;
|
||||
}
|
||||
};
|
||||
@ -99,7 +99,7 @@ public class LongGCDisruptionTests extends ESTestCase {
|
||||
// make sure some threads are under lock
|
||||
underLock.await();
|
||||
RuntimeException e = expectThrows(RuntimeException.class, disruption::startDisrupting);
|
||||
assertThat(e.getMessage(), containsString("stopping node threads took too long"));
|
||||
assertThat(e.getMessage(), containsString("suspending node threads took too long"));
|
||||
} finally {
|
||||
stop.set(true);
|
||||
pauseUnderLock.countDown();
|
||||
|
Loading…
x
Reference in New Issue
Block a user